Skip to content

ToolScheduler

File: brain/agent/tool_scheduler.py


Purpose

ToolScheduler partitions a list of ToolCall objects (all calls from one LLM generation step) into an ordered sequence of waves. Each wave is either:

  • A parallel batch of READ_ONLY calls with non-overlapping resource keys, or
  • A sequential single call.

This ensures that write operations — vault mutations, database writes, network calls — never run concurrently with each other or with reads that share the same resource.


The Scheduling Rules

Tool safety class Has resource overlap? Wave type
READ_ONLY No Parallel (with other compatible READ_ONLY calls)
READ_ONLY Yes Sequential (own wave)
LOCAL_WRITE Always sequential
NETWORK Always sequential
DESTRUCTIVE Always sequential
Unknown / unspecified Treated as READ_ONLY (conservative)

A single-call batch always runs as sequential (no concurrency overhead).


Safety Classes

Defined by ToolSpec.safety_class (a SafetyClass enum from brain/kernel/). The scheduler reads this via AgentToolRegistry._get_tool_safety_class().

Parallel-safe classes: READ_ONLY, read_only
Sequential classes: LOCAL_WRITE, local_write, NETWORK, network, DESTRUCTIVE, destructive


Resource Keys

Resource keys declare which shared resources a tool accesses. They are set in ToolSpec.metadata["resource_keys"] as a list of strings.

# Example tool spec metadata
metadata = {
    "resource_keys": ["vault", "sqlite:events"],
    "tool_timeout_s": 30,
}

If a tool has no resource_keys declared, the tool name itself is used as the key (conservative: two calls to the same tool cannot run in parallel).

Two READ_ONLY tools can run in the same parallel wave only if their resource key sets do not overlap.


The schedule() Method

waves: list[ToolWave] = scheduler.schedule(
    calls=result.tool_calls,
    tool_registry=self.tools,
    parallel_enabled=True,   # global kill-switch
)

Algorithm:

  1. If only one call, or parallel_enabled=False → one sequential wave, done.
  2. Iterate calls left-to-right:
  3. Non-parallel-safe call → flush any accumulated parallel candidates first, then append a single sequential wave.
  4. READ_ONLY call with no resource overlap → add to the pending parallel batch.
  5. READ_ONLY call with resource overlap → flush the current batch, start a new one.
  6. Flush any remaining parallel candidates at the end.

_flush_parallel(calls): If the batch has only one call, it becomes a sequential wave (no concurrency overhead). If it has multiple calls, they become a single parallel wave.


ToolWave Dataclass

@dataclass
class ToolWave:
    calls: list[ToolCall]
    parallel: bool              # True → execute concurrently
    resource_keys: frozenset[str]  # Combined keys of all calls in this wave

Example

Given calls: [local_search(q1), local_search(q2), write_vault(data), web_search(q3)]

Assuming local_search and web_search are READ_ONLY, and write_vault is LOCAL_WRITE:

Step 1: local_search(q1) — READ_ONLY, resource "local_search"
Step 2: local_search(q2) — READ_ONLY, resource "local_search" ← OVERLAP
        → Flush batch [local_search(q1)] → sequential wave
        → Start new batch [local_search(q2)]

Step 3: write_vault(data) — LOCAL_WRITE
        → Flush batch [local_search(q2)] → sequential wave
        → Sequential wave for write_vault

Step 4: web_search(q3) — READ_ONLY, resource "web_search"
        → Start batch [web_search(q3)]

End: Flush [web_search(q3)] → sequential wave (only one call)

Result waves:
  Wave 1: sequential [local_search(q1)]
  Wave 2: sequential [local_search(q2)]   ← resource overlap forced split
  Wave 3: sequential [write_vault(data)]
  Wave 4: sequential [web_search(q3)]

If both local searches had different resource keys:

Result waves:
  Wave 1: parallel   [local_search(q1), local_search(q2)]
  Wave 2: sequential [write_vault(data)]
  Wave 3: sequential [web_search(q3)]


Harness Integration

waves = self._scheduler.schedule(
    result.tool_calls, self.tools,
    parallel_enabled=self._parallel_enabled
)

for wave in waves:
    if wave.parallel:
        with ThreadPoolExecutor(max_workers=len(wave.calls)) as pool:
            futures = {pool.submit(_exec_call_v2, tc, ...): tc for tc in wave.calls}
            for fut in as_completed(futures):
                outcome = fut.result()
    else:
        for tc in wave.calls:
            outcome = _exec_call_v2(tc, ...)