Skip to content

Multi-Agent Orchestrator

MultiAgentOrchestrator fans out to multiple sub-agents in parallel using ThreadPoolExecutor, then synthesises their outputs.

Usage

from brain.patterns import MultiAgentOrchestrator, RAGAgent, SubTask

orch = MultiAgentOrchestrator(n_decompose=3, timeout_s=30.0)

# Explicit subtasks
subtasks = [
    SubTask(name="docs",    task="Summarise the API docs",   pattern=RAGAgent(retriever=...)),
    SubTask(name="issues",  task="List open bugs",           pattern=RAGAgent(retriever=...)),
]
result = orch.run("Give me an API health report", subtasks=subtasks)

# Auto-decompose (no subtasks needed)
result = orch.run("Research three aspects of quantum computing")

API Reference

brain.patterns.multi_agent.MultiAgentOrchestrator

MultiAgentOrchestrator(provider: LLMProvider | None = None, max_workers: int = 8, timeout_s: float = 60.0, n_decompose: int = 3)

Bases: BasePattern

Run sub-agents in parallel and synthesize their outputs.

Parameters:

Name Type Description Default
provider LLMProvider | None

LLMProvider for decomposition + synthesis steps.

None
max_workers int

thread-pool size (default: number of sub-tasks, capped at 8).

8
timeout_s float

per-agent timeout in seconds.

60.0
n_decompose int

if subtasks are not provided to run(), auto-decompose into this many.

3
Source code in brain/patterns/multi_agent.py
def __init__(
    self,
    provider: LLMProvider | None = None,
    max_workers: int = 8,
    timeout_s: float = 60.0,
    n_decompose: int = 3,
) -> None:
    self.max_workers = max_workers
    self.timeout_s = timeout_s
    self.n_decompose = n_decompose
    self._provider = provider

run

run(task: str, subtasks: list[SubTask] | None = None, **kwargs: Any) -> PatternResult
Source code in brain/patterns/multi_agent.py
def run(self, task: str, subtasks: list[SubTask] | None = None, **kwargs: Any) -> PatternResult:
    provider = self._get_provider()

    # Build subtasks if not provided
    if not subtasks:
        sub_task_descriptions = self._auto_decompose(task, provider)
        from .react import ReActAgent

        subtasks = [
            SubTask(name=f"agent_{i}", task=desc, pattern=ReActAgent(provider=self._provider))
            for i, desc in enumerate(sub_task_descriptions)
        ]

    workers = min(self.max_workers, len(subtasks))
    sub_results: list[SubTaskResult] = []
    steps: list[Step] = []

    with ThreadPoolExecutor(max_workers=workers) as pool:
        futures = {pool.submit(st.pattern.run, st.task): st for st in subtasks}
        for future in as_completed(futures, timeout=self.timeout_s + 1):
            st = futures[future]
            try:
                pat_result = future.result(timeout=self.timeout_s)
                sr = SubTaskResult(name=st.name, task=st.task, result=pat_result)
            except FuturesTimeout:  # silent-ok: fail-soft, surface via metric/log elsewhere
                sr = SubTaskResult(
                    name=st.name,
                    task=st.task,
                    result=PatternResult(answer="", ok=False, error="timeout"),
                    error="timeout",
                )
            except Exception as exc:  # noqa: BLE001  # silent-ok: fail-soft, surface via metric/log elsewhere
                sr = SubTaskResult(
                    name=st.name,
                    task=st.task,
                    result=PatternResult(answer="", ok=False, error=str(exc)),
                    error=str(exc),
                )
            sub_results.append(sr)
            steps.append(
                Step(
                    index=len(steps),
                    action=st.name,
                    action_input=st.task,
                    observation=sr.result.answer if not sr.error else f"ERROR: {sr.error}",
                )
            )

    answer = self._synthesize(task, sub_results, provider)
    ok = any(not sr.error for sr in sub_results)

    return PatternResult(
        answer=answer,
        steps=steps,
        iterations=len(subtasks),
        ok=ok,
        metadata={
            "sub_results": [
                {"name": sr.name, "ok": not sr.error, "answer": sr.result.answer}
                for sr in sub_results
            ]
        },
    )

brain.patterns.multi_agent.SubTask dataclass

SubTask(name: str, task: str, pattern: BasePattern)

One unit of work for a sub-agent.

Parallelism

Sub-agents run concurrently. Each sub-result contributes to result.metadata["sub_results"]:

for sr in result.metadata["sub_results"]:
    print(f"[{sr['name']}] ok={sr['ok']}, answer={sr['answer'][:80]}")

Boundary Policy

Use multi-agent execution only when the work splits cleanly. The supported runtime patterns are:

  • orchestrator-worker
  • evaluator-optimizer

Generic swarms are intentionally out of policy. A child agent must carry boundary metadata before it can run:

  • ownership_scope: files, modules, or task scopes owned by the child
  • worktree_id: isolated workspace identifier
  • mailbox_id: channel for parent/child coordination
  • cancel_token: explicit cancellation handle
  • synthesis_trace_id: trace id used when merging child output

The sub-agent registry generates defaults when a caller omits them, rejects unknown delegation patterns, and prevents two active children from owning the same scope. Team synthesis should cite child outputs through the synthesis trace rather than blindly merging free-form responses.

Prompt Contract

SecondBrain also carries a Codex-style delegation contract in the parent and child prompts when sub-agent tools are enabled. Parent agents are instructed to do immediate critical-path work locally, delegate only independent side work, and provide each child with:

  • ownership_scope
  • deliverable
  • shared_context
  • coordination_notes
  • verification
  • optional exclusions

Child agents receive those fields as trusted run context with the boundary metadata, budget, and output contract. They are explicitly told that they are not alone in the workspace, must not revert or overwrite user or sibling-agent changes, and should return compact findings with sources, confidence, and verification status for synthesis.

When to Use

Situation Recommendation
Multiple independent data sources MultiAgentOrchestrator
Need concurrency / speed-up MultiAgentOrchestrator
Single coherent task ReAct or Reflexion
Sequential dependent steps Plan & Execute
Overlapping edit ownership Split scopes first or run sequentially