Skip to content

BoundedToolExecutor

File: brain/agent/tool_executor_v2.py


Purpose

BoundedToolExecutor executes a single tool call with:

  1. Explicit per-tool deadline — not the full turn timeout
  2. Late-write blocking — timed-out threads cannot corrupt the next turn
  3. Typed outcomes — returns ToolOutcome, never raises
  4. Cooperative cancellationDeadlineToken allows well-behaved tools to exit early
  5. Oversized output handling — large results go to ArtifactStore

The executor is stateless (no fields mutated during execute()), so a single instance can be shared across parallel waves.


Construction

executor = BoundedToolExecutor(
    tools_registry=tools,         # AgentToolRegistry
    callbacks=callbacks,           # AgentCallbacks
    event_log=event_log,
    checkpoint_store=checkpoint_store,  # None if not snapshotting writes
    artifact_store=artifact_store,
    thinking_manager=thinking,
    run_context_metadata={...},    # Injected into RunContext.metadata
)

execute() — Public API

outcome: ToolOutcome = executor.execute(
    call=tool_call,
    state=turn_state,
    web_mode="off",
    budget=turn_budget,
    tool_timeout_cap_s=45.0,
)

Pre-execution deadline gate

If budget.is_expired() before execution begins, returns ToolDenied(reason="deadline") immediately — no thread is spawned.

Thread execution

per_tool_s = budget.per_tool_remaining_s(tool_timeout_cap_s)
deadline_token = DeadlineToken.from_budget(budget, tool_timeout_cap_s)
state.set_trace_running(trace_id, call.name)

executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(_run_tool_call, call, state, ...)

try:
    outcome = future.result(timeout=per_tool_s)
except TimeoutError:
    deadline_token.cancel()           # signal cooperative tools
    state.mark_trace_timed_out(trace_id)
    return ToolTimeout(retryable=retryable, ...)

per_tool_s is at least 5 seconds even if the turn budget is nearly exhausted (budget.per_tool_remaining_s() enforces a 5s minimum).


_run_tool_call() — Thread Body

Runs inside the tool thread. Returns a ToolOutcome; never raises.

Execution sequence

  1. Web mode gate — if call.name == "web_search" and web_mode == "off", return ToolFailure(error="web_search is disabled...").

  2. Build RunContext — sets timeout_s=deadline_token.remaining_s() (the per-tool deadline, not the full turn timeout).

  3. tools.run_with_context(call.name, call.arguments, run_context=run_ctx)

  4. Write confirmation flow — if the tool returns "Write confirmation required":

  5. Call callbacks.on_write_confirm(call.name, arguments, diff=diff_str)
  6. If denied → return ToolDenied(reason="write_denied")
  7. If confirmed → optionally snapshot via CheckpointStore, then re-execute with confirm_write=True

  8. Late-write checkwrites_ok = state.complete_trace(trace_id, latency_ms=elapsed_ms)

  9. If False (thread arrived late) → return ToolTimeout(retryable=False)
  10. All side effects below are skipped

  11. Event logevent_log.log_tool_call(...) (only if state.is_accepting_writes())

  12. Record tool resultstate.record_tool_result(...) (lightweight feed for ReflectionEngine)

  13. Build outcome:

  14. status == "error"ToolFailure
  15. Output ≤ 12,000 chars → ToolExecutionResult
  16. Output > 12,000 chars → ArtifactStore.store()ToolArtifactReference

Write Confirmation Flow

Certain tools (e.g. write_file, edit_file) require explicit user confirmation before executing side effects.

tool returns "Write confirmation required"
generate_diff() or generate_edit_diff()  ← optional preview
callbacks.on_write_confirm(name, args, diff=diff_str)
        ├─ False → ToolDenied(reason="write_denied")
        └─ True  → CheckpointStore.snapshot(paths)  ← optional undo snapshot
                   re-execute with confirm_write=True

Output Serialisation

Tool outputs go through a compaction pipeline before size checking:

_compact_output(result)
    
  If result has "entries" list  truncate to 200 items
  Otherwise  _truncate_payload(result, depth=0)
    - str values: max 3,000 chars
    - lists at depth 0: max 200 items
    - dicts at depth 0: max 80 keys
    - max recursion depth: 4
    
json.dumps(compacted)   check len > 12_000  ArtifactStore

_retryable_on_timeout() Helper

executor._retryable_on_timeout("superpower_generate")
# → reads tool_spec.metadata["retry_on_timeout"]
# → defaults to True if not specified

Used to populate ToolTimeout.retryable, which determines whether the tool is blocked for the rest of the turn.


Dependency Injection

All collaborators are injected at construction time:

Dependency Used for
tools_registry run_with_context(), permission mode, write path extraction
callbacks on_write_confirm(), on_tool_error() (via thinking_manager)
event_log log_tool_call()
checkpoint_store Pre-write snapshot
artifact_store Oversized output storage
thinking_manager on_tool_error() notification
run_context_metadata Injected into RunContext.metadata