Skip to content

brain/kernel — SecondBrain Kernel Reference

Last updated: 2026-03-02 (as-built, corrected against source)

The kernel is the core OS-layer for all agentic AI in SecondBrain. Chat, tasks, gateway, and domain agents should build on kernel primitives when possible. It provides three things every agent needs:

  1. RunContext — a lightweight per-request envelope carrying IDs, budget, locale, and safety mode.
  2. ToolExecutor — policy-enforced, traced, retrying tool execution.
  3. MemoryBundle — three-tier memory (working, episodic, durable).

Package layout

brain/kernel/
├── __init__.py          # re-exports the public API
├── run_context.py       # RunContext, RunBudget, SafetyMode, LocaleCode
├── contracts.py         # ToolSpec, ToolResult, ToolErrorCode, ToolProvenance,
│                        # ToolArgSchema, ToolReturnSchema, SafetyClass,
│                        # CompiledContextItem, AgentCapabilities
├── memory.py            # WorkingMemory, EpisodicMemory, DurableMemory,
│                        # MemoryBundle, MemoryEntry, MemoryProtocol
└── tooling/
    ├── __init__.py      # re-exports ToolRegistry, ToolExecutor, ToolPolicy, PolicyAction
    ├── permissions.py   # ToolPolicy, PolicyAction, PolicyDecision
    ├── registry.py      # ToolRegistry, get_default_registry
    └── executor.py      # ToolExecutor

RunContext

from brain.kernel import RunContext, SafetyMode

ctx = RunContext.create(
    agent_name="my_agent",
    intent="summarise vault",
    safety_mode=SafetyMode.SAFE,
    session_id="sess_abc",
)
print(ctx.run_id)          # "run_<16 hex chars>"
print(ctx.trace_id)        # "<32 hex chars>"
print(ctx.is_safe_mode())  # True
print(ctx.elapsed_ms())    # float — wall-clock ms since creation

RunContext fields

Field Type Default Notes
run_id str run_<uuid16> Stable per-run identifier
trace_id str <uuid32> For OTEL span correlation
session_id str "" Chat/API session scope
user_sid str "" End-user session identifier
agent_name str "default" Name of the running agent
intent str "" Human-readable goal/intent
locale LocaleCode EN_US en-US / en-IN / hi-IN
safety_mode SafetyMode NORMAL Controls ToolPolicy evaluation
budget RunBudget defaults Resource limits + accumulators
metadata dict {} Pass-through; used by AgenticRuntime (dry_run)

SafetyMode values

Value Meaning
NORMAL Default — all tools allowed
SAFE Read-only + no network
STRICT Maximal restriction (same as SAFE + redaction intent)

is_safe_mode() returns True for both SAFE and STRICT.

RunBudget

from brain.kernel import RunBudget

budget = RunBudget(
    max_tool_calls=10,
    max_prompt_tokens=4000,
    max_latency_ms=30_000,
    max_cost_usd_cents=5.0,
)
ctx = RunContext.create(agent_name="my_agent", budget=budget)

RunBudget fields:

Field Type Default Notes
max_tool_calls int 20 Hard cap on total tool calls
max_retries_per_tool int 3 Per-spec retry limit
max_prompt_tokens int 4000 Prompt token budget
max_latency_ms int 30_000 Wall-clock cap in ms
max_cost_usd_cents float 10.0 Cost ceiling
tool_calls_used int 0 Accumulator (mutated in-place)
prompt_tokens_used int 0 Accumulator
latency_ms_used float 0.0 Accumulator
cost_usd_cents_used float 0.0 Accumulator

Key methods:

ctx.budget.remaining_tool_calls()  # int: max_tool_calls - tool_calls_used
ctx.budget.budget_exceeded()       # bool: True if any cap is hit
ctx.budget.to_dict()               # serialise all fields

RunContext.metadata — pass-through slot

metadata is a free dict threaded through all kernel calls. Consumers:

# AgenticRuntime dry-run (plan only, no tool calls)
ctx.metadata["dry_run"] = True

# Any agent-specific data
ctx.metadata["user_context"] = {...}

Tool contracts

SafetyClass hierarchy

READ_ONLY < LOCAL_WRITE < NETWORK < DESTRUCTIVE

ToolPolicy uses this order when evaluating which tools are permitted for a given SafetyMode.

ToolSpec

from brain.kernel import ToolSpec, SafetyClass

spec = ToolSpec(
    tool_id="myagent.do_thing",          # ^[a-z][a-z0-9_\-\.]+$
    name="Do Thing",
    description="...",
    safety_class=SafetyClass.READ_ONLY,
    idempotent=True,
    max_retries=2,
    timeout_ms=5_000,
    allowlisted_agents=["my_agent"],     # empty list = all agents allowed
    emit_trace=True,
    emit_metric=True,
)

Note: The field is allowlisted_agents, not allowed_agents.

ToolSpec fields:

Field Type Default Notes
tool_id str required Regex ^[a-z][a-z0-9_\-\.]+$
name str required Human-readable display name
description str "" Shown to LLM for tool selection
safety_class SafetyClass READ_ONLY Risk level
idempotent bool True Safe to retry without side-effects
args_schema list[ToolArgSchema] [] Argument declarations
returns_schema ToolReturnSchema default Return type hint
requires_permission list[str] [] e.g. ["calendar.read"]
allowlisted_agents list[str] [] Empty = all agents OK
emit_trace bool True Emit OTEL span per call
emit_metric bool True Emit metrics per call
max_retries int 2 0–5
timeout_ms int 5_000 100–60 000 ms

Tool contract fingerprints

Every ToolSpec can produce a deterministic governed-contract fingerprint:

fingerprint = spec.contract_fingerprint()  # "sha256:<hex>"

The fingerprint covers the tool id, name, description, safety class, idempotency, argument schema, return schema, required permissions, allowlisted agents, and selected governance metadata such as required identity scopes and side-effect risk tags. Runtime-only metadata and callables are excluded.

ToolRegistry.register(...) records the fingerprint seen at registration time. ToolExecutor compares that value before execution:

  • contract_enforcement="observe" logs mismatches without blocking.
  • contract_enforcement="enforce" denies changed local-write, network, destructive, and side_effect_level=external_write contracts before policy, budget, or execution.
  • contract_enforcement="off" logs the current fingerprint without comparing against the registration baseline.

Tool-call audit metadata includes tool_contract_fingerprint, tool_contract_registered_fingerprint, tool_contract_status, tool_contract_enforcement, and tool_contract_mismatch.

Persisted operator baselines live in the runtime database and are managed with the runtime-guard CLI:

sb runtime-guard tools --surface kernel --json
sb runtime-guard snapshot --surface kernel --approved-by local-operator
sb runtime-guard mismatches --surface kernel --json
sb runtime-guard approve-contract kernel vision.analyze

These baselines compare the current live registry against the last approved contract fingerprints across process restarts. They do not replace the registration-time check above; they add an operator approval layer for changed, new, or missing contracts.

Execution enforcement can consume the same baseline store. When a ToolExecutor is created with contract_baseline_store=... and contract_enforcement="enforce", high-risk contracts are denied before policy, budget, or callable execution if they are new, changed from the approved baseline, or the baseline check fails. Read-only contracts remain observable.

Convenience shortcut on ToolRegistry:

registry.register_fn(
    tool_id="myagent.do_thing",
    name="Do Thing",
    fn=my_callable,
    safety_class=SafetyClass.READ_ONLY,
    description="...",
    idempotent=True,
)

ToolArgSchema and ToolReturnSchema

from brain.kernel.contracts import ToolArgSchema, ToolReturnSchema

ToolArgSchema(name="path", type="string", required=True, description="Vault path")
ToolReturnSchema(type="object", description="File contents dict")

ToolResult

from brain.kernel import ToolResult, ToolErrorCode

# Success
result = ToolResult.ok("myagent.do_thing", {"items": [1, 2, 3]}, latency_ms=12.5)
assert result.success
assert result.data == {"items": [1, 2, 3]}

# Error
result = ToolResult.error(
    "myagent.do_thing",
    ToolErrorCode.NOT_FOUND,
    "resource missing",
)
assert not result.success
assert result.error_code == ToolErrorCode.NOT_FOUND

ToolResult fields: call_id, tool_id, success, data, error_code, error_message, latency_ms, retries, trace_id, span_id, provenance.

ToolErrorCode taxonomy

Code Meaning
NOT_FOUND Requested resource absent
PERMISSION_DENIED Policy blocked the call
TIMEOUT Call exceeded latency budget
VALIDATION_ERROR Bad input arguments
NETWORK_ERROR Network / connectivity failure
RATE_LIMITED Upstream rate limit hit
BUDGET_EXCEEDED Run budget (tool calls) consumed
UNEXPECTED Catch-all for unclassified errors

ToolProvenance

from brain.kernel.contracts import ToolProvenance

prov = ToolProvenance(source_uri="vault/notes.md", freshness_s=60)
result = ToolResult.ok("myagent.read", {"content": "..."}, provenance=prov)

ToolRegistry and ToolExecutor

from brain.kernel import (
    RunContext, SafetyMode, ToolSpec, SafetyClass
)
from brain.kernel.tooling.registry import ToolRegistry
from brain.kernel.tooling.executor import ToolExecutor
from brain.kernel.tooling.permissions import ToolPolicy

# 1. Build a registry
registry = ToolRegistry()
registry.register(spec, callable_fn)           # register spec + callable together
# or shorthand:
registry.register_fn(tool_id="myagent.x", name="X", fn=my_fn)

# 2. Create an executor (ctx is NOT passed to constructor)
executor = ToolExecutor(registry=registry, policy=ToolPolicy.safe_mode())

# 3. Execute — pass ctx as first arg; policy + budget + retries handled automatically
ctx = RunContext.create(agent_name="demo", safety_mode=SafetyMode.SAFE)
result = executor.execute(ctx, "myagent.do_thing", {"arg": "value"})

Common mistake: ctx goes to execute(ctx, ...), not to the ToolExecutor(...) constructor. The constructor takes (registry, policy, event_log).

ToolExecutor constructor

ToolExecutor(
    registry: ToolRegistry,
    policy: ToolPolicy | None = None,   # defaults to ToolPolicy.normal()
    event_log: Any = None,              # brain.state.EventLog for structured logging
)

The execute() method also accepts an optional override_fn for testing:

result = executor.execute(ctx, "myagent.do_thing", args, override_fn=mock_fn)

ToolPolicy factory methods

Factory Allows up to Network
ToolPolicy.normal(blocked_tools=None, allowed_tools=None) DESTRUCTIVE (with approval) Yes
ToolPolicy.safe_mode() READ_ONLY only No
ToolPolicy.strict_mode() READ_ONLY only No

DESTRUCTIVE tools always return REQUIRE_APPROVAL even in normal mode.

Custom allow/block:

policy = ToolPolicy.normal(
    blocked_tools={"myagent.dangerous"},
    allowed_tools={"myagent.read", "myagent.summarise"},
)

Execution pipeline (7 steps)

  1. Lookup — find ToolSpec in registry; NOT_FOUND if absent.
  2. PolicyToolPolicy.evaluate(spec, agent_name, args); PERMISSION_DENIED if blocked.
  3. Budgetctx.budget.remaining_tool_calls() <= 0BUDGET_EXCEEDED.
  4. Locate callableregistry.get_callable(tool_id) or override_fn; NOT_FOUND if missing.
  5. Execute with retry — invoke callable; retry up to spec.max_retries on exception.
  6. Emit_emit(ctx, tool_id, args, result) — logs to event_log if wired.
  7. Accumulatectx.budget.tool_calls_used += 1; ctx.budget.latency_ms_used += latency.

ToolRegistry query methods

registry.list_all()                           # list[ToolSpec], sorted by tool_id
registry.list_ids()                           # list[str]
registry.filter_by_safety(SafetyClass.READ_ONLY)   # tools at or below given class
registry.filter_for_agent("meeting_copilot") # tools whose allowlisted_agents includes name
len(registry)                                # int
"myagent.do_thing" in registry               # bool

Global default registry

from brain.kernel.tooling.registry import get_default_registry

registry = get_default_registry()  # process-wide singleton ToolRegistry

MemoryBundle

from brain.kernel.memory import MemoryBundle

memory = MemoryBundle.create(
    trace_id=ctx.trace_id,
    session_id=ctx.session_id,
    event_log=event_log,       # optional — wires episodic writes to event log
    memory_store=mem_store,    # optional — wires durable writes to SQLite
)

# Working memory (in-process, lost on exit)
memory.working.put("last_result", {"items": [1, 2]})
val = memory.working.get("last_result")

# Episodic memory (per-trace, written to event log if wired)
memory.episodic.put("step_1_output", {"ok": True}, source="tool.vault_read")

# Durable memory (survives restarts when backed by memory_store)
memory.durable.put("user_preference", "compact", ttl_s=86400.0)
pref = memory.durable.get("user_preference")   # "compact"

Note: The TTL parameter is ttl_s (seconds, float), not ttl_seconds.

Memory tiers

Tier Scope Persists across restarts Backed by
working In-process No In-memory dict
episodic Per-trace No (TTL ~ trace life) In-memory + optional event log
durable Long-lived Yes (with memory_store) SQLite memory table

All three tiers implement MemoryProtocol:

class MemoryProtocol(Protocol):
    def put(self, key: str, value: Any, *, source: str = "", ttl_s: float | None = None) -> None: ...
    def get(self, key: str, default: Any = None) -> Any: ...
    def delete(self, key: str) -> bool: ...      # returns True if key existed
    def list_keys(self) -> list[str]: ...

WorkingMemory and EpisodicMemory also expose snapshot() -> dict[str, Any].

MemoryEntry

Each value in any tier is stored as a MemoryEntry:

@dataclass
class MemoryEntry:
    key: str
    value: Any
    tier: str           # "working" | "episodic" | "durable"
    source: str = ""    # where this fact came from
    confidence: float = 1.0
    created_at_s: float = ...  # time.time() on creation
    ttl_s: float | None = None

    def is_expired(self) -> bool: ...
    @property
    def entry_id(self) -> str: ...  # "mem_<tier3>_<key>"

Context injection bridge

The kernel's CompiledContextItem (output of the context compiler) can be produced from the ContextOS injection pipeline:

from brain.context_packs import ContextInjector, injection_plan_to_compiled_items
from brain.contracts.envelopes import ContextEnvelope
from brain.decisions.resolver import DecisionResolver

resolver = DecisionResolver()
spec = resolver.resolve("support.context.injection.v1")
injector = ContextInjector()
plan = injector.build_plan(spec, envelope, pack_store)
items = injection_plan_to_compiled_items(plan)  # list[CompiledContextItem]

See brain/context_packs/kernel_bridge.py.


AgentCapabilities

Domain agents declare their needs at registration time:

from brain.kernel.contracts import AgentCapabilities, SafetyClass

caps = AgentCapabilities(
    agent_name="meeting_copilot",
    description="Extracts action items and risks from transcripts",
    required_packs=["meeting.transcript"],
    tool_allowlist=["meeting.extract_action_items", "meeting.identify_risks"],
    decision_keys=["support.meeting.extract", "support.meeting.followup"],
    safety_class_ceiling=SafetyClass.READ_ONLY,
    version="1.1.0",
)

Decision namespaces

The kernel registers the following keys in brain/decisions/namespace.py:

Key Meaning
kernel.tool.dispatch Tool execution dispatch decision
kernel.tool.policy.blocked Policy blocked a tool call
kernel.budget.exceeded Run budget consumed
kernel.memory.put Value written to memory
kernel.memory.get Value read from memory
kernel.run.started Agent run started
kernel.run.finished Agent run completed

CLI commands

# Overview
sb kernel info
sb kernel info --json

# Decision namespaces
sb kernel namespaces
sb kernel namespaces --json

# Tool registry (filter examples)
sb kernel tools
sb kernel tools --safety read_only
sb kernel tools --agent meeting_copilot
sb kernel tools --json

Building a kernel-native agent

In brief:

from brain.kernel import RunContext, SafetyMode, MemoryBundle
from brain.kernel.tooling.registry import ToolRegistry
from brain.kernel.tooling.executor import ToolExecutor
from brain.kernel.tooling.permissions import ToolPolicy

class MyAgent:
    def __init__(self):
        self._registry = ToolRegistry()
        self._registry.register(MY_SPEC, my_callable)
        self._executor = ToolExecutor(
            registry=self._registry,
            policy=ToolPolicy.safe_mode(),
            # event_log=event_log,   # optional structured logging
        )

    def run(self, inp):
        ctx = RunContext.create(agent_name="my_agent", safety_mode=SafetyMode.SAFE)
        memory = MemoryBundle.create(trace_id=ctx.trace_id)
        # ctx goes to execute(), not the constructor
        result = self._executor.execute(ctx, "my_agent.do_thing", {"input": inp})
        if result.success:
            memory.durable.put("last_run", result.data, ttl_s=3600.0)
        return result.data

Integration with AgenticRuntime

AgenticRuntime uses the kernel for all tool execution. The dry_run flag is passed via RunContext.metadata:

from brain.agent.agentic import AgenticRuntime
from brain.kernel.run_context import RunContext

ctx = RunContext.create(agent_name="agentic", intent=goal)
ctx.metadata["dry_run"] = True   # plan only — no ToolExecutor calls

runtime = AgenticRuntime(tool_registry=registry)
record = runtime.run(goal, ctx)
# record.dry_run is True; record.critique.passed is always True in dry-run

The runtime checks ctx.budget.remaining_tool_calls() <= 0 before each step and emits a StepArtifact(ok=False, error="budget exhausted") instead of calling the executor.


Tests

# All kernel tests
make test-kernel

# Individual suites
.venv/bin/python -m pytest tests/kernel/test_kernel_run_context.py  -q
.venv/bin/python -m pytest tests/kernel/test_kernel_contracts.py    -q
.venv/bin/python -m pytest tests/kernel/test_kernel_tooling.py      -q
.venv/bin/python -m pytest tests/kernel/test_kernel_memory.py       -q
.venv/bin/python -m pytest tests/kernel/test_kernel_cli.py          -q