brain/kernel — SecondBrain Kernel Reference¶
Last updated: 2026-03-02 (as-built, corrected against source)
The kernel is the core OS-layer for all agentic AI in SecondBrain. Chat, tasks, gateway, and domain agents should build on kernel primitives when possible. It provides three things every agent needs:
- RunContext — a lightweight per-request envelope carrying IDs, budget, locale, and safety mode.
- ToolExecutor — policy-enforced, traced, retrying tool execution.
- MemoryBundle — three-tier memory (working, episodic, durable).
Package layout¶
brain/kernel/
├── __init__.py # re-exports the public API
├── run_context.py # RunContext, RunBudget, SafetyMode, LocaleCode
├── contracts.py # ToolSpec, ToolResult, ToolErrorCode, ToolProvenance,
│ # ToolArgSchema, ToolReturnSchema, SafetyClass,
│ # CompiledContextItem, AgentCapabilities
├── memory.py # WorkingMemory, EpisodicMemory, DurableMemory,
│ # MemoryBundle, MemoryEntry, MemoryProtocol
└── tooling/
├── __init__.py # re-exports ToolRegistry, ToolExecutor, ToolPolicy, PolicyAction
├── permissions.py # ToolPolicy, PolicyAction, PolicyDecision
├── registry.py # ToolRegistry, get_default_registry
└── executor.py # ToolExecutor
RunContext¶
from brain.kernel import RunContext, SafetyMode
ctx = RunContext.create(
agent_name="my_agent",
intent="summarise vault",
safety_mode=SafetyMode.SAFE,
session_id="sess_abc",
)
print(ctx.run_id) # "run_<16 hex chars>"
print(ctx.trace_id) # "<32 hex chars>"
print(ctx.is_safe_mode()) # True
print(ctx.elapsed_ms()) # float — wall-clock ms since creation
RunContext fields¶
| Field | Type | Default | Notes |
|---|---|---|---|
run_id |
str |
run_<uuid16> |
Stable per-run identifier |
trace_id |
str |
<uuid32> |
For OTEL span correlation |
session_id |
str |
"" |
Chat/API session scope |
user_sid |
str |
"" |
End-user session identifier |
agent_name |
str |
"default" |
Name of the running agent |
intent |
str |
"" |
Human-readable goal/intent |
locale |
LocaleCode |
EN_US |
en-US / en-IN / hi-IN |
safety_mode |
SafetyMode |
NORMAL |
Controls ToolPolicy evaluation |
budget |
RunBudget |
defaults | Resource limits + accumulators |
metadata |
dict |
{} |
Pass-through; used by AgenticRuntime (dry_run) |
SafetyMode values¶
| Value | Meaning |
|---|---|
NORMAL |
Default — all tools allowed |
SAFE |
Read-only + no network |
STRICT |
Maximal restriction (same as SAFE + redaction intent) |
is_safe_mode()returnsTruefor bothSAFEandSTRICT.
RunBudget¶
from brain.kernel import RunBudget
budget = RunBudget(
max_tool_calls=10,
max_prompt_tokens=4000,
max_latency_ms=30_000,
max_cost_usd_cents=5.0,
)
ctx = RunContext.create(agent_name="my_agent", budget=budget)
RunBudget fields:
| Field | Type | Default | Notes |
|---|---|---|---|
max_tool_calls |
int |
20 |
Hard cap on total tool calls |
max_retries_per_tool |
int |
3 |
Per-spec retry limit |
max_prompt_tokens |
int |
4000 |
Prompt token budget |
max_latency_ms |
int |
30_000 |
Wall-clock cap in ms |
max_cost_usd_cents |
float |
10.0 |
Cost ceiling |
tool_calls_used |
int |
0 |
Accumulator (mutated in-place) |
prompt_tokens_used |
int |
0 |
Accumulator |
latency_ms_used |
float |
0.0 |
Accumulator |
cost_usd_cents_used |
float |
0.0 |
Accumulator |
Key methods:
ctx.budget.remaining_tool_calls() # int: max_tool_calls - tool_calls_used
ctx.budget.budget_exceeded() # bool: True if any cap is hit
ctx.budget.to_dict() # serialise all fields
RunContext.metadata — pass-through slot¶
metadata is a free dict threaded through all kernel calls. Consumers:
# AgenticRuntime dry-run (plan only, no tool calls)
ctx.metadata["dry_run"] = True
# Any agent-specific data
ctx.metadata["user_context"] = {...}
Tool contracts¶
SafetyClass hierarchy¶
ToolPolicy uses this order when evaluating which tools are permitted for a given SafetyMode.
ToolSpec¶
from brain.kernel import ToolSpec, SafetyClass
spec = ToolSpec(
tool_id="myagent.do_thing", # ^[a-z][a-z0-9_\-\.]+$
name="Do Thing",
description="...",
safety_class=SafetyClass.READ_ONLY,
idempotent=True,
max_retries=2,
timeout_ms=5_000,
allowlisted_agents=["my_agent"], # empty list = all agents allowed
emit_trace=True,
emit_metric=True,
)
Note: The field is
allowlisted_agents, notallowed_agents.
ToolSpec fields:
| Field | Type | Default | Notes |
|---|---|---|---|
tool_id |
str |
required | Regex ^[a-z][a-z0-9_\-\.]+$ |
name |
str |
required | Human-readable display name |
description |
str |
"" |
Shown to LLM for tool selection |
safety_class |
SafetyClass |
READ_ONLY |
Risk level |
idempotent |
bool |
True |
Safe to retry without side-effects |
args_schema |
list[ToolArgSchema] |
[] |
Argument declarations |
returns_schema |
ToolReturnSchema |
default | Return type hint |
requires_permission |
list[str] |
[] |
e.g. ["calendar.read"] |
allowlisted_agents |
list[str] |
[] |
Empty = all agents OK |
emit_trace |
bool |
True |
Emit OTEL span per call |
emit_metric |
bool |
True |
Emit metrics per call |
max_retries |
int |
2 |
0–5 |
timeout_ms |
int |
5_000 |
100–60 000 ms |
Tool contract fingerprints¶
Every ToolSpec can produce a deterministic governed-contract fingerprint:
The fingerprint covers the tool id, name, description, safety class, idempotency, argument schema, return schema, required permissions, allowlisted agents, and selected governance metadata such as required identity scopes and side-effect risk tags. Runtime-only metadata and callables are excluded.
ToolRegistry.register(...) records the fingerprint seen at registration time.
ToolExecutor compares that value before execution:
contract_enforcement="observe"logs mismatches without blocking.contract_enforcement="enforce"denies changed local-write, network, destructive, andside_effect_level=external_writecontracts before policy, budget, or execution.contract_enforcement="off"logs the current fingerprint without comparing against the registration baseline.
Tool-call audit metadata includes tool_contract_fingerprint,
tool_contract_registered_fingerprint, tool_contract_status,
tool_contract_enforcement, and tool_contract_mismatch.
Persisted operator baselines live in the runtime database and are managed with
the runtime-guard CLI:
sb runtime-guard tools --surface kernel --json
sb runtime-guard snapshot --surface kernel --approved-by local-operator
sb runtime-guard mismatches --surface kernel --json
sb runtime-guard approve-contract kernel vision.analyze
These baselines compare the current live registry against the last approved contract fingerprints across process restarts. They do not replace the registration-time check above; they add an operator approval layer for changed, new, or missing contracts.
Execution enforcement can consume the same baseline store. When a
ToolExecutor is created with contract_baseline_store=... and
contract_enforcement="enforce", high-risk contracts are denied before policy,
budget, or callable execution if they are new, changed from the approved
baseline, or the baseline check fails. Read-only contracts remain observable.
Convenience shortcut on ToolRegistry:
registry.register_fn(
tool_id="myagent.do_thing",
name="Do Thing",
fn=my_callable,
safety_class=SafetyClass.READ_ONLY,
description="...",
idempotent=True,
)
ToolArgSchema and ToolReturnSchema¶
from brain.kernel.contracts import ToolArgSchema, ToolReturnSchema
ToolArgSchema(name="path", type="string", required=True, description="Vault path")
ToolReturnSchema(type="object", description="File contents dict")
ToolResult¶
from brain.kernel import ToolResult, ToolErrorCode
# Success
result = ToolResult.ok("myagent.do_thing", {"items": [1, 2, 3]}, latency_ms=12.5)
assert result.success
assert result.data == {"items": [1, 2, 3]}
# Error
result = ToolResult.error(
"myagent.do_thing",
ToolErrorCode.NOT_FOUND,
"resource missing",
)
assert not result.success
assert result.error_code == ToolErrorCode.NOT_FOUND
ToolResult fields: call_id, tool_id, success, data, error_code,
error_message, latency_ms, retries, trace_id, span_id, provenance.
ToolErrorCode taxonomy¶
| Code | Meaning |
|---|---|
NOT_FOUND |
Requested resource absent |
PERMISSION_DENIED |
Policy blocked the call |
TIMEOUT |
Call exceeded latency budget |
VALIDATION_ERROR |
Bad input arguments |
NETWORK_ERROR |
Network / connectivity failure |
RATE_LIMITED |
Upstream rate limit hit |
BUDGET_EXCEEDED |
Run budget (tool calls) consumed |
UNEXPECTED |
Catch-all for unclassified errors |
ToolProvenance¶
from brain.kernel.contracts import ToolProvenance
prov = ToolProvenance(source_uri="vault/notes.md", freshness_s=60)
result = ToolResult.ok("myagent.read", {"content": "..."}, provenance=prov)
ToolRegistry and ToolExecutor¶
from brain.kernel import (
RunContext, SafetyMode, ToolSpec, SafetyClass
)
from brain.kernel.tooling.registry import ToolRegistry
from brain.kernel.tooling.executor import ToolExecutor
from brain.kernel.tooling.permissions import ToolPolicy
# 1. Build a registry
registry = ToolRegistry()
registry.register(spec, callable_fn) # register spec + callable together
# or shorthand:
registry.register_fn(tool_id="myagent.x", name="X", fn=my_fn)
# 2. Create an executor (ctx is NOT passed to constructor)
executor = ToolExecutor(registry=registry, policy=ToolPolicy.safe_mode())
# 3. Execute — pass ctx as first arg; policy + budget + retries handled automatically
ctx = RunContext.create(agent_name="demo", safety_mode=SafetyMode.SAFE)
result = executor.execute(ctx, "myagent.do_thing", {"arg": "value"})
Common mistake:
ctxgoes toexecute(ctx, ...), not to theToolExecutor(...)constructor. The constructor takes(registry, policy, event_log).
ToolExecutor constructor¶
ToolExecutor(
registry: ToolRegistry,
policy: ToolPolicy | None = None, # defaults to ToolPolicy.normal()
event_log: Any = None, # brain.state.EventLog for structured logging
)
The execute() method also accepts an optional override_fn for testing:
ToolPolicy factory methods¶
| Factory | Allows up to | Network |
|---|---|---|
ToolPolicy.normal(blocked_tools=None, allowed_tools=None) |
DESTRUCTIVE (with approval) |
Yes |
ToolPolicy.safe_mode() |
READ_ONLY only |
No |
ToolPolicy.strict_mode() |
READ_ONLY only |
No |
DESTRUCTIVE tools always return REQUIRE_APPROVAL even in normal mode.
Custom allow/block:
policy = ToolPolicy.normal(
blocked_tools={"myagent.dangerous"},
allowed_tools={"myagent.read", "myagent.summarise"},
)
Execution pipeline (7 steps)¶
- Lookup — find
ToolSpecin registry;NOT_FOUNDif absent. - Policy —
ToolPolicy.evaluate(spec, agent_name, args);PERMISSION_DENIEDif blocked. - Budget —
ctx.budget.remaining_tool_calls() <= 0→BUDGET_EXCEEDED. - Locate callable —
registry.get_callable(tool_id)oroverride_fn;NOT_FOUNDif missing. - Execute with retry — invoke callable; retry up to
spec.max_retrieson exception. - Emit —
_emit(ctx, tool_id, args, result)— logs toevent_logif wired. - Accumulate —
ctx.budget.tool_calls_used += 1;ctx.budget.latency_ms_used += latency.
ToolRegistry query methods¶
registry.list_all() # list[ToolSpec], sorted by tool_id
registry.list_ids() # list[str]
registry.filter_by_safety(SafetyClass.READ_ONLY) # tools at or below given class
registry.filter_for_agent("meeting_copilot") # tools whose allowlisted_agents includes name
len(registry) # int
"myagent.do_thing" in registry # bool
Global default registry¶
from brain.kernel.tooling.registry import get_default_registry
registry = get_default_registry() # process-wide singleton ToolRegistry
MemoryBundle¶
from brain.kernel.memory import MemoryBundle
memory = MemoryBundle.create(
trace_id=ctx.trace_id,
session_id=ctx.session_id,
event_log=event_log, # optional — wires episodic writes to event log
memory_store=mem_store, # optional — wires durable writes to SQLite
)
# Working memory (in-process, lost on exit)
memory.working.put("last_result", {"items": [1, 2]})
val = memory.working.get("last_result")
# Episodic memory (per-trace, written to event log if wired)
memory.episodic.put("step_1_output", {"ok": True}, source="tool.vault_read")
# Durable memory (survives restarts when backed by memory_store)
memory.durable.put("user_preference", "compact", ttl_s=86400.0)
pref = memory.durable.get("user_preference") # "compact"
Note: The TTL parameter is
ttl_s(seconds, float), notttl_seconds.
Memory tiers¶
| Tier | Scope | Persists across restarts | Backed by |
|---|---|---|---|
working |
In-process | No | In-memory dict |
episodic |
Per-trace | No (TTL ~ trace life) | In-memory + optional event log |
durable |
Long-lived | Yes (with memory_store) | SQLite memory table |
All three tiers implement MemoryProtocol:
class MemoryProtocol(Protocol):
def put(self, key: str, value: Any, *, source: str = "", ttl_s: float | None = None) -> None: ...
def get(self, key: str, default: Any = None) -> Any: ...
def delete(self, key: str) -> bool: ... # returns True if key existed
def list_keys(self) -> list[str]: ...
WorkingMemory and EpisodicMemory also expose snapshot() -> dict[str, Any].
MemoryEntry¶
Each value in any tier is stored as a MemoryEntry:
@dataclass
class MemoryEntry:
key: str
value: Any
tier: str # "working" | "episodic" | "durable"
source: str = "" # where this fact came from
confidence: float = 1.0
created_at_s: float = ... # time.time() on creation
ttl_s: float | None = None
def is_expired(self) -> bool: ...
@property
def entry_id(self) -> str: ... # "mem_<tier3>_<key>"
Context injection bridge¶
The kernel's CompiledContextItem (output of the context compiler) can be
produced from the ContextOS injection pipeline:
from brain.context_packs import ContextInjector, injection_plan_to_compiled_items
from brain.contracts.envelopes import ContextEnvelope
from brain.decisions.resolver import DecisionResolver
resolver = DecisionResolver()
spec = resolver.resolve("support.context.injection.v1")
injector = ContextInjector()
plan = injector.build_plan(spec, envelope, pack_store)
items = injection_plan_to_compiled_items(plan) # list[CompiledContextItem]
See brain/context_packs/kernel_bridge.py.
AgentCapabilities¶
Domain agents declare their needs at registration time:
from brain.kernel.contracts import AgentCapabilities, SafetyClass
caps = AgentCapabilities(
agent_name="meeting_copilot",
description="Extracts action items and risks from transcripts",
required_packs=["meeting.transcript"],
tool_allowlist=["meeting.extract_action_items", "meeting.identify_risks"],
decision_keys=["support.meeting.extract", "support.meeting.followup"],
safety_class_ceiling=SafetyClass.READ_ONLY,
version="1.1.0",
)
Decision namespaces¶
The kernel registers the following keys in brain/decisions/namespace.py:
| Key | Meaning |
|---|---|
kernel.tool.dispatch |
Tool execution dispatch decision |
kernel.tool.policy.blocked |
Policy blocked a tool call |
kernel.budget.exceeded |
Run budget consumed |
kernel.memory.put |
Value written to memory |
kernel.memory.get |
Value read from memory |
kernel.run.started |
Agent run started |
kernel.run.finished |
Agent run completed |
CLI commands¶
# Overview
sb kernel info
sb kernel info --json
# Decision namespaces
sb kernel namespaces
sb kernel namespaces --json
# Tool registry (filter examples)
sb kernel tools
sb kernel tools --safety read_only
sb kernel tools --agent meeting_copilot
sb kernel tools --json
Building a kernel-native agent¶
In brief:
from brain.kernel import RunContext, SafetyMode, MemoryBundle
from brain.kernel.tooling.registry import ToolRegistry
from brain.kernel.tooling.executor import ToolExecutor
from brain.kernel.tooling.permissions import ToolPolicy
class MyAgent:
def __init__(self):
self._registry = ToolRegistry()
self._registry.register(MY_SPEC, my_callable)
self._executor = ToolExecutor(
registry=self._registry,
policy=ToolPolicy.safe_mode(),
# event_log=event_log, # optional structured logging
)
def run(self, inp):
ctx = RunContext.create(agent_name="my_agent", safety_mode=SafetyMode.SAFE)
memory = MemoryBundle.create(trace_id=ctx.trace_id)
# ctx goes to execute(), not the constructor
result = self._executor.execute(ctx, "my_agent.do_thing", {"input": inp})
if result.success:
memory.durable.put("last_run", result.data, ttl_s=3600.0)
return result.data
Integration with AgenticRuntime¶
AgenticRuntime uses the kernel for all tool execution. The dry_run flag
is passed via RunContext.metadata:
from brain.agent.agentic import AgenticRuntime
from brain.kernel.run_context import RunContext
ctx = RunContext.create(agent_name="agentic", intent=goal)
ctx.metadata["dry_run"] = True # plan only — no ToolExecutor calls
runtime = AgenticRuntime(tool_registry=registry)
record = runtime.run(goal, ctx)
# record.dry_run is True; record.critique.passed is always True in dry-run
The runtime checks ctx.budget.remaining_tool_calls() <= 0 before each step
and emits a StepArtifact(ok=False, error="budget exhausted") instead of
calling the executor.
Tests¶
# All kernel tests
make test-kernel
# Individual suites
.venv/bin/python -m pytest tests/kernel/test_kernel_run_context.py -q
.venv/bin/python -m pytest tests/kernel/test_kernel_contracts.py -q
.venv/bin/python -m pytest tests/kernel/test_kernel_tooling.py -q
.venv/bin/python -m pytest tests/kernel/test_kernel_memory.py -q
.venv/bin/python -m pytest tests/kernel/test_kernel_cli.py -q