ToolScheduler¶
File: brain/agent/tool_scheduler.py
Purpose¶
ToolScheduler partitions a list of ToolCall objects (all calls from one LLM
generation step) into an ordered sequence of waves. Each wave is either:
- A parallel batch of
READ_ONLYcalls with non-overlapping resource keys, or - A sequential single call.
This ensures that write operations — vault mutations, database writes, network calls — never run concurrently with each other or with reads that share the same resource.
The Scheduling Rules¶
| Tool safety class | Has resource overlap? | Wave type |
|---|---|---|
READ_ONLY |
No | Parallel (with other compatible READ_ONLY calls) |
READ_ONLY |
Yes | Sequential (own wave) |
LOCAL_WRITE |
— | Always sequential |
NETWORK |
— | Always sequential |
DESTRUCTIVE |
— | Always sequential |
| Unknown / unspecified | — | Treated as READ_ONLY (conservative) |
A single-call batch always runs as sequential (no concurrency overhead).
Safety Classes¶
Defined by ToolSpec.safety_class (a SafetyClass enum from brain/kernel/).
The scheduler reads this via AgentToolRegistry._get_tool_safety_class().
Parallel-safe classes: READ_ONLY, read_only
Sequential classes: LOCAL_WRITE, local_write, NETWORK, network, DESTRUCTIVE, destructive
Resource Keys¶
Resource keys declare which shared resources a tool accesses. They are set in
ToolSpec.metadata["resource_keys"] as a list of strings.
# Example tool spec metadata
metadata = {
"resource_keys": ["vault", "sqlite:events"],
"tool_timeout_s": 30,
}
If a tool has no resource_keys declared, the tool name itself is used as the
key (conservative: two calls to the same tool cannot run in parallel).
Two READ_ONLY tools can run in the same parallel wave only if their resource
key sets do not overlap.
The schedule() Method¶
waves: list[ToolWave] = scheduler.schedule(
calls=result.tool_calls,
tool_registry=self.tools,
parallel_enabled=True, # global kill-switch
)
Algorithm:
- If only one call, or
parallel_enabled=False→ one sequential wave, done. - Iterate calls left-to-right:
- Non-parallel-safe call → flush any accumulated parallel candidates first, then append a single sequential wave.
READ_ONLYcall with no resource overlap → add to the pending parallel batch.READ_ONLYcall with resource overlap → flush the current batch, start a new one.- Flush any remaining parallel candidates at the end.
_flush_parallel(calls): If the batch has only one call, it becomes a sequential
wave (no concurrency overhead). If it has multiple calls, they become a single
parallel wave.
ToolWave Dataclass¶
@dataclass
class ToolWave:
calls: list[ToolCall]
parallel: bool # True → execute concurrently
resource_keys: frozenset[str] # Combined keys of all calls in this wave
Example¶
Given calls: [local_search(q1), local_search(q2), write_vault(data), web_search(q3)]
Assuming local_search and web_search are READ_ONLY, and write_vault is
LOCAL_WRITE:
Step 1: local_search(q1) — READ_ONLY, resource "local_search"
Step 2: local_search(q2) — READ_ONLY, resource "local_search" ← OVERLAP
→ Flush batch [local_search(q1)] → sequential wave
→ Start new batch [local_search(q2)]
Step 3: write_vault(data) — LOCAL_WRITE
→ Flush batch [local_search(q2)] → sequential wave
→ Sequential wave for write_vault
Step 4: web_search(q3) — READ_ONLY, resource "web_search"
→ Start batch [web_search(q3)]
End: Flush [web_search(q3)] → sequential wave (only one call)
Result waves:
Wave 1: sequential [local_search(q1)]
Wave 2: sequential [local_search(q2)] ← resource overlap forced split
Wave 3: sequential [write_vault(data)]
Wave 4: sequential [web_search(q3)]
If both local searches had different resource keys:
Result waves:
Wave 1: parallel [local_search(q1), local_search(q2)]
Wave 2: sequential [write_vault(data)]
Wave 3: sequential [web_search(q3)]
Harness Integration¶
waves = self._scheduler.schedule(
result.tool_calls, self.tools,
parallel_enabled=self._parallel_enabled
)
for wave in waves:
if wave.parallel:
with ThreadPoolExecutor(max_workers=len(wave.calls)) as pool:
futures = {pool.submit(_exec_call_v2, tc, ...): tc for tc in wave.calls}
for fut in as_completed(futures):
outcome = fut.result()
else:
for tc in wave.calls:
outcome = _exec_call_v2(tc, ...)