Skip to content

Async Patterns

All sync patterns have async equivalents that never block the event loop. The key concurrency primitive is run_concurrent() — runs N agents with asyncio.gather().

Quick start

import asyncio
from brain.patterns import AsyncReActAgent, AsyncRAGAgent, run_concurrent

# Single async agent
agent = AsyncReActAgent(tools={"search": my_search})
result = await agent.run("What is RAG?")

# Parallel agents — the main value-add
results = await run_concurrent([
    (AsyncReActAgent(tools={"search": web}), "Latest AI safety papers"),
    (AsyncRAGAgent(retriever=db_fn),         "Our Q1 auth decisions"),
])
for r in results:
    print(r.answer)

FastAPI integration

from fastapi import FastAPI
from brain.patterns import AsyncReActAgent, run_concurrent

app = FastAPI()
_agent = AsyncReActAgent(tools={"search": my_search})

@app.get("/ask")
async def ask(q: str):
    result = await _agent.run(q)
    return {"answer": result.answer, "ok": result.ok}

@app.get("/compare")
async def compare(q: str):
    results = await run_concurrent([
        (AsyncReActAgent(tools={"search": web}), q),
        (AsyncRAGAgent(retriever=db_fn),         q),
    ])
    return {"react": results[0].answer, "rag": results[1].answer}

Async retriever

AsyncRAGAgent accepts both sync and async def retrievers:

async def db_retriever(query: str, top_k: int = 3) -> list[str]:
    rows = await db.fetch("SELECT text FROM docs LIMIT $1", top_k)
    return [r.text for r in rows]

agent = AsyncRAGAgent(retriever=db_retriever)
result = await agent.run("What is our SLA?")

Async chain

from brain.patterns import AsyncPatternChain, AsyncRAGAgent, AsyncReflexionAgent

chain = AsyncPatternChain([
    AsyncRAGAgent(retriever=my_retriever),
    AsyncReflexionAgent(max_iterations=2),
])
result = await chain.run("What is our authentication strategy?")

Async router

from brain.patterns import AsyncPatternRouter

router = AsyncPatternRouter(
    routes={
        "sql":      AsyncRAGAgent(retriever=db_fn),
        "research": AsyncReActAgent(tools={"search": web}),
    },
    route_fn=lambda t: "sql" if "query" in t.lower() else "research",
)
result = await router.run("How many events last week?")
print(result.metadata["route"])  # "sql"

API Reference

brain.patterns.async_patterns.run_concurrent async

run_concurrent(agents_tasks: list[tuple[AsyncBasePattern | BasePattern, str]], *, return_exceptions: bool = False) -> list[PatternResult]

Run multiple (pattern, task) pairs concurrently with asyncio.gather().

This is the primary concurrency primitive — equivalent to MultiAgentOrchestrator but fully async and composable.

Parameters:

Name Type Description Default
agents_tasks list[tuple[AsyncBasePattern | BasePattern, str]]

List of (pattern, task) tuples. Patterns may be AsyncBasePattern or sync BasePattern (auto-wrapped).

required
return_exceptions bool

If True, failed patterns return a PatternResult with ok=False instead of propagating the exception. Default: False (exceptions propagate).

False

Returns:

Type Description
list[PatternResult]

List of PatternResult in the same order as agents_tasks.

Example::

results = await run_concurrent([
    (AsyncReActAgent(tools={"search": web}), "What is RAG?"),
    (AsyncRAGAgent(retriever=db_fn),         "Our Q1 decisions"),
    (AsyncReflexionAgent(max_iterations=2),  "Explain transformers"),
])
for r in results:
    print(r.answer)
Source code in brain/patterns/async_patterns.py
async def run_concurrent(
    agents_tasks: list[tuple[AsyncBasePattern | BasePattern, str]],
    *,
    return_exceptions: bool = False,
) -> list[PatternResult]:
    """Run multiple ``(pattern, task)`` pairs concurrently with ``asyncio.gather()``.

    This is the primary concurrency primitive — equivalent to
    ``MultiAgentOrchestrator`` but fully async and composable.

    Args:
        agents_tasks: List of ``(pattern, task)`` tuples. Patterns may be
            ``AsyncBasePattern`` or sync ``BasePattern`` (auto-wrapped).
        return_exceptions: If True, failed patterns return a ``PatternResult``
            with ``ok=False`` instead of propagating the exception.
            Default: False (exceptions propagate).

    Returns:
        List of ``PatternResult`` in the same order as ``agents_tasks``.

    Example::

        results = await run_concurrent([
            (AsyncReActAgent(tools={"search": web}), "What is RAG?"),
            (AsyncRAGAgent(retriever=db_fn),         "Our Q1 decisions"),
            (AsyncReflexionAgent(max_iterations=2),  "Explain transformers"),
        ])
        for r in results:
            print(r.answer)
    """

    async def _one(pattern: Any, task: str) -> PatternResult:
        try:
            if isinstance(pattern, AsyncBasePattern):
                return await pattern.run(task)
            return await _run_in_thread(pattern.run, task)
        except Exception as exc:  # noqa: BLE001
            if return_exceptions:
                return PatternResult(answer="", ok=False, error=str(exc))
            raise

    coros = [_one(p, t) for p, t in agents_tasks]
    return list(await asyncio.gather(*coros))

brain.patterns.async_patterns.AsyncReActAgent

AsyncReActAgent(**kwargs: Any)

Bases: AsyncBasePattern

Async ReAct agent.

Wraps the sync ReActAgent so it never blocks the event loop. All constructor arguments are forwarded unchanged to ReActAgent; see that class for the full parameter reference.

Source code in brain/patterns/async_patterns.py
def __init__(self, **kwargs: Any) -> None:
    from .react import ReActAgent

    self._inner = ReActAgent(**kwargs)

run async

run(task: str, **kwargs: Any) -> PatternResult

Run ReAct asynchronously.

Parameters:

Name Type Description Default
task str

The question or instruction for the agent.

required

Returns:

Type Description
PatternResult

PatternResult — same as the sync ReActAgent.

Source code in brain/patterns/async_patterns.py
async def run(self, task: str, **kwargs: Any) -> PatternResult:
    """Run ReAct asynchronously.

    Args:
        task: The question or instruction for the agent.

    Returns:
        PatternResult — same as the sync ReActAgent.
    """
    return await _run_in_thread(self._inner.run, task, **kwargs)

brain.patterns.async_patterns.AsyncRAGAgent

AsyncRAGAgent(retriever: AsyncRetriever, provider: Any = None, top_k: int = 3, include_sources: bool = False)

Bases: AsyncBasePattern

Async RAG agent — supports both sync and async retrievers.

Unlike the sync RAGAgent, the retriever may be an async def function. The LLM call is still dispatched via asyncio.to_thread() to avoid blocking.

Parameters:

Name Type Description Default
retriever AsyncRetriever

(query, top_k) -> list[str] — may be sync or async def.

required
provider Any

LLMProvider instance; defaults to LocalEchoProvider.

None
top_k int

Number of chunks to retrieve.

3
include_sources bool

If True, include retrieved chunks in result.metadata["sources"].

False
Source code in brain/patterns/async_patterns.py
def __init__(
    self,
    retriever: AsyncRetriever,
    provider: Any = None,
    top_k: int = 3,
    include_sources: bool = False,
) -> None:
    self._retriever = retriever
    self._top_k = top_k
    self._include_sources = include_sources
    self._provider = provider

run async

run(task: str, **kwargs: Any) -> PatternResult

Run RAG asynchronously.

Retrieves chunks concurrently with any other async work happening in the event loop, then generates the answer in a thread pool.

Parameters:

Name Type Description Default
task str

The question or instruction for the agent.

required

Returns:

Type Description
PatternResult

PatternResult with metadata["sources"] when include_sources=True.

Source code in brain/patterns/async_patterns.py
async def run(self, task: str, **kwargs: Any) -> PatternResult:
    """Run RAG asynchronously.

    Retrieves chunks concurrently with any other async work happening in the
    event loop, then generates the answer in a thread pool.

    Args:
        task: The question or instruction for the agent.

    Returns:
        PatternResult with ``metadata["sources"]`` when ``include_sources=True``.
    """
    # Retrieve (async-safe)
    try:
        chunks = await _call_retriever(self._retriever, task, self._top_k)
    except Exception as exc:  # noqa: BLE001
        logger.warning("AsyncRAGAgent: retriever raised: %s", exc)
        chunks = []

    # Build context string
    context = "\n\n".join(f"[{i + 1}] {c}" for i, c in enumerate(chunks)) if chunks else ""

    provider = self._get_provider()

    if not context:
        system = "You are a helpful assistant. Answer based on your knowledge."
        messages = [
            {"role": "system", "content": system},
            {"role": "user", "content": task},
        ]
    else:
        system = (
            "You are a helpful assistant. Answer using ONLY the provided context. "
            "If the context does not contain the answer, say so."
        )
        messages = [
            {"role": "system", "content": system},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {task}"},
        ]

    try:
        llm_result = await _run_in_thread(
            provider.generate, messages, tools=[], tool_choice="none"
        )
        answer = llm_result.text.strip()
    except Exception as exc:  # noqa: BLE001
        return PatternResult(answer="", ok=False, error=str(exc))

    meta: dict[str, Any] = {"chunks_retrieved": len(chunks)}
    if self._include_sources:
        meta["sources"] = chunks

    return PatternResult(answer=answer, iterations=1, metadata=meta)

brain.patterns.async_patterns.AsyncReflexionAgent

AsyncReflexionAgent(**kwargs: Any)

Bases: AsyncBasePattern

Async Reflexion agent.

Wraps the sync ReflexionAgent so it never blocks the event loop. All constructor arguments are forwarded unchanged to ReflexionAgent; see that class for the full parameter reference.

Source code in brain/patterns/async_patterns.py
def __init__(self, **kwargs: Any) -> None:
    from .reflexion import ReflexionAgent

    self._inner = ReflexionAgent(**kwargs)

run async

run(task: str, **kwargs: Any) -> PatternResult

Run Reflexion asynchronously.

Source code in brain/patterns/async_patterns.py
async def run(self, task: str, **kwargs: Any) -> PatternResult:
    """Run Reflexion asynchronously."""
    return await _run_in_thread(self._inner.run, task, **kwargs)

brain.patterns.async_patterns.AsyncPlanAndExecuteAgent

AsyncPlanAndExecuteAgent(**kwargs: Any)

Bases: AsyncBasePattern

Async Plan-and-Execute agent.

Wraps the sync PlanAndExecuteAgent so it never blocks the event loop. All constructor arguments are forwarded unchanged to PlanAndExecuteAgent; see that class for the full parameter reference.

Source code in brain/patterns/async_patterns.py
def __init__(self, **kwargs: Any) -> None:
    from .plan_execute import PlanAndExecuteAgent

    self._inner = PlanAndExecuteAgent(**kwargs)

run async

run(task: str, **kwargs: Any) -> PatternResult

Run Plan-and-Execute asynchronously.

Source code in brain/patterns/async_patterns.py
async def run(self, task: str, **kwargs: Any) -> PatternResult:
    """Run Plan-and-Execute asynchronously."""
    return await _run_in_thread(self._inner.run, task, **kwargs)

brain.patterns.async_patterns.AsyncPatternChain

AsyncPatternChain(patterns: list[AsyncBasePattern | BasePattern], transform_fn: Callable[..., Any] | None = None, stop_on_failure: bool = True)

Bases: AsyncBasePattern

Run async patterns sequentially, piping each result's answer to the next.

Each step awaits the previous one — use run_concurrent() if you need independent patterns to run in parallel.

Parameters:

Name Type Description Default
patterns list[AsyncBasePattern | BasePattern]

Ordered list of AsyncBasePattern or BasePattern instances. Sync patterns are automatically wrapped via asyncio.to_thread().

required
transform_fn Callable[..., Any] | None

async (result, step_index) -> str — produces the next task. Defaults to passing result.answer through. May be sync or async.

None
stop_on_failure bool

Stop if any step returns ok=False. Default: True.

True
Source code in brain/patterns/async_patterns.py
def __init__(
    self,
    patterns: list[AsyncBasePattern | BasePattern],
    transform_fn: Callable[..., Any] | None = None,
    stop_on_failure: bool = True,
) -> None:
    if not patterns:
        raise ValueError("AsyncPatternChain requires at least one pattern")
    self._patterns = patterns
    self._transform_fn = transform_fn
    self._stop_on_failure = stop_on_failure

run async

run(task: str, **kwargs: Any) -> PatternResult

Execute the chain sequentially, awaiting each step.

Returns:

Type Description
PatternResult

PatternResult from the final pattern, with metadata["chain_steps"].

Source code in brain/patterns/async_patterns.py
async def run(self, task: str, **kwargs: Any) -> PatternResult:
    """Execute the chain sequentially, awaiting each step.

    Returns:
        PatternResult from the final pattern, with ``metadata["chain_steps"]``.
    """
    chain_steps: list[dict[str, Any]] = []
    current_task = task
    total_iterations = 0

    for i, pattern in enumerate(self._patterns):
        name = type(pattern).__name__
        try:
            result = await self._step_run(pattern, current_task)
        except Exception as exc:  # noqa: BLE001
            chain_steps.append({"step": i, "pattern": name, "ok": False, "error": str(exc)})
            if self._stop_on_failure:
                return PatternResult(
                    answer="",
                    ok=False,
                    error=f"Step {i} ({name}) raised: {exc}",
                    iterations=total_iterations,
                    metadata={"chain_steps": chain_steps},
                )
            continue

        chain_steps.append(
            {
                "step": i,
                "pattern": name,
                "answer": result.answer,
                "ok": result.ok,
                "iterations": result.iterations,
                "error": result.error,
            }
        )
        total_iterations += result.iterations

        if not result.ok and self._stop_on_failure:
            return PatternResult(
                answer=result.answer,
                ok=False,
                error=f"Step {i} ({name}) failed: {result.error}",
                iterations=total_iterations,
                metadata={"chain_steps": chain_steps},
            )

        if i < len(self._patterns) - 1:
            if self._transform_fn is not None:
                try:
                    if asyncio.iscoroutinefunction(self._transform_fn):
                        current_task = await self._transform_fn(result, i)
                    else:
                        current_task = self._transform_fn(result, i)
                except Exception as exc:  # noqa: BLE001
                    logger.warning("AsyncPatternChain: transform_fn raised: %s", exc)
                    current_task = result.answer
            else:
                current_task = result.answer

    last = chain_steps[-1] if chain_steps else {}
    return PatternResult(
        answer=last.get("answer", ""),
        ok=all(s.get("ok", False) for s in chain_steps),
        iterations=total_iterations,
        metadata={"chain_steps": chain_steps, "chain_length": len(self._patterns)},
    )

brain.patterns.async_patterns.AsyncPatternRouter

AsyncPatternRouter(routes: dict[str, AsyncBasePattern | BasePattern], route_fn: Callable[..., Any], default: str | None = None)

Bases: AsyncBasePattern

Route a task to one of several async patterns based on a routing function.

Parameters:

Name Type Description Default
routes dict[str, AsyncBasePattern | BasePattern]

Dict mapping label → AsyncBasePattern or BasePattern.

required
route_fn Callable[..., Any]

Sync or async callable (task) -> label.

required
default str | None

Fallback label if the route is unknown.

None
Source code in brain/patterns/async_patterns.py
def __init__(
    self,
    routes: dict[str, AsyncBasePattern | BasePattern],
    route_fn: Callable[..., Any],
    default: str | None = None,
) -> None:
    if not routes:
        raise ValueError("AsyncPatternRouter requires at least one route")
    self._routes = routes
    self._route_fn = route_fn
    self._default = default

run async

run(task: str, **kwargs: Any) -> PatternResult

Route and run asynchronously.

Source code in brain/patterns/async_patterns.py
async def run(self, task: str, **kwargs: Any) -> PatternResult:
    """Route and run asynchronously."""
    try:
        if asyncio.iscoroutinefunction(self._route_fn):
            label = await self._route_fn(task)
        else:
            label = self._route_fn(task)
    except Exception as exc:  # noqa: BLE001
        logger.warning("AsyncPatternRouter: route_fn raised: %s", exc)
        label = self._default or ""

    pattern = self._routes.get(label)
    if pattern is None and self._default:
        label = self._default
        pattern = self._routes.get(self._default)

    if pattern is None:
        return PatternResult(
            answer="",
            ok=False,
            error=f"AsyncPatternRouter: unknown route {label!r}. "
            f"Available: {list(self._routes)}",
            metadata={"route": label, "available_routes": list(self._routes)},
        )

    if isinstance(pattern, AsyncBasePattern):
        result = await pattern.run(task, **kwargs)
    else:
        result = await _run_in_thread(pattern.run, task, **kwargs)

    result.metadata["route"] = label
    result.metadata["pattern"] = type(pattern).__name__
    result.metadata["available_routes"] = list(self._routes)
    return result

Sync vs Async comparison

Sync Async When to use async
ReActAgent AsyncReActAgent FastAPI, async event loops
RAGAgent AsyncRAGAgent Async DB/API retrievers
ReflexionAgent AsyncReflexionAgent Async pipelines
PatternChain AsyncPatternChain Sequential async steps
PatternRouter AsyncPatternRouter Async routing logic
MultiAgentOrchestrator run_concurrent() True parallel execution