Async Patterns¶
All sync patterns have async equivalents that never block the event loop.
The key concurrency primitive is run_concurrent() — runs N agents with asyncio.gather().
Quick start¶
import asyncio
from brain.patterns import AsyncReActAgent, AsyncRAGAgent, run_concurrent
# Single async agent
agent = AsyncReActAgent(tools={"search": my_search})
result = await agent.run("What is RAG?")
# Parallel agents — the main value-add
results = await run_concurrent([
(AsyncReActAgent(tools={"search": web}), "Latest AI safety papers"),
(AsyncRAGAgent(retriever=db_fn), "Our Q1 auth decisions"),
])
for r in results:
print(r.answer)
FastAPI integration¶
from fastapi import FastAPI
from brain.patterns import AsyncReActAgent, run_concurrent
app = FastAPI()
_agent = AsyncReActAgent(tools={"search": my_search})
@app.get("/ask")
async def ask(q: str):
result = await _agent.run(q)
return {"answer": result.answer, "ok": result.ok}
@app.get("/compare")
async def compare(q: str):
results = await run_concurrent([
(AsyncReActAgent(tools={"search": web}), q),
(AsyncRAGAgent(retriever=db_fn), q),
])
return {"react": results[0].answer, "rag": results[1].answer}
Async retriever¶
AsyncRAGAgent accepts both sync and async def retrievers:
async def db_retriever(query: str, top_k: int = 3) -> list[str]:
rows = await db.fetch("SELECT text FROM docs LIMIT $1", top_k)
return [r.text for r in rows]
agent = AsyncRAGAgent(retriever=db_retriever)
result = await agent.run("What is our SLA?")
Async chain¶
from brain.patterns import AsyncPatternChain, AsyncRAGAgent, AsyncReflexionAgent
chain = AsyncPatternChain([
AsyncRAGAgent(retriever=my_retriever),
AsyncReflexionAgent(max_iterations=2),
])
result = await chain.run("What is our authentication strategy?")
Async router¶
from brain.patterns import AsyncPatternRouter
router = AsyncPatternRouter(
routes={
"sql": AsyncRAGAgent(retriever=db_fn),
"research": AsyncReActAgent(tools={"search": web}),
},
route_fn=lambda t: "sql" if "query" in t.lower() else "research",
)
result = await router.run("How many events last week?")
print(result.metadata["route"]) # "sql"
API Reference¶
brain.patterns.async_patterns.run_concurrent
async
¶
run_concurrent(agents_tasks: list[tuple[AsyncBasePattern | BasePattern, str]], *, return_exceptions: bool = False) -> list[PatternResult]
Run multiple (pattern, task) pairs concurrently with asyncio.gather().
This is the primary concurrency primitive — equivalent to
MultiAgentOrchestrator but fully async and composable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agents_tasks
|
list[tuple[AsyncBasePattern | BasePattern, str]]
|
List of |
required |
return_exceptions
|
bool
|
If True, failed patterns return a |
False
|
Returns:
| Type | Description |
|---|---|
list[PatternResult]
|
List of |
Example::
results = await run_concurrent([
(AsyncReActAgent(tools={"search": web}), "What is RAG?"),
(AsyncRAGAgent(retriever=db_fn), "Our Q1 decisions"),
(AsyncReflexionAgent(max_iterations=2), "Explain transformers"),
])
for r in results:
print(r.answer)
Source code in brain/patterns/async_patterns.py
brain.patterns.async_patterns.AsyncReActAgent
¶
Bases: AsyncBasePattern
Async ReAct agent.
Wraps the sync ReActAgent so it never blocks the event loop.
All constructor arguments are forwarded unchanged to ReActAgent;
see that class for the full parameter reference.
Source code in brain/patterns/async_patterns.py
run
async
¶
Run ReAct asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
str
|
The question or instruction for the agent. |
required |
Returns:
| Type | Description |
|---|---|
PatternResult
|
PatternResult — same as the sync ReActAgent. |
Source code in brain/patterns/async_patterns.py
brain.patterns.async_patterns.AsyncRAGAgent
¶
AsyncRAGAgent(retriever: AsyncRetriever, provider: Any = None, top_k: int = 3, include_sources: bool = False)
Bases: AsyncBasePattern
Async RAG agent — supports both sync and async retrievers.
Unlike the sync RAGAgent, the retriever may be an async def function.
The LLM call is still dispatched via asyncio.to_thread() to avoid blocking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retriever
|
AsyncRetriever
|
|
required |
provider
|
Any
|
LLMProvider instance; defaults to LocalEchoProvider. |
None
|
top_k
|
int
|
Number of chunks to retrieve. |
3
|
include_sources
|
bool
|
If True, include retrieved chunks in |
False
|
Source code in brain/patterns/async_patterns.py
run
async
¶
Run RAG asynchronously.
Retrieves chunks concurrently with any other async work happening in the event loop, then generates the answer in a thread pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
str
|
The question or instruction for the agent. |
required |
Returns:
| Type | Description |
|---|---|
PatternResult
|
PatternResult with |
Source code in brain/patterns/async_patterns.py
brain.patterns.async_patterns.AsyncReflexionAgent
¶
Bases: AsyncBasePattern
Async Reflexion agent.
Wraps the sync ReflexionAgent so it never blocks the event loop.
All constructor arguments are forwarded unchanged to ReflexionAgent;
see that class for the full parameter reference.
Source code in brain/patterns/async_patterns.py
run
async
¶
brain.patterns.async_patterns.AsyncPlanAndExecuteAgent
¶
Bases: AsyncBasePattern
Async Plan-and-Execute agent.
Wraps the sync PlanAndExecuteAgent so it never blocks the event loop.
All constructor arguments are forwarded unchanged to PlanAndExecuteAgent;
see that class for the full parameter reference.
Source code in brain/patterns/async_patterns.py
run
async
¶
brain.patterns.async_patterns.AsyncPatternChain
¶
AsyncPatternChain(patterns: list[AsyncBasePattern | BasePattern], transform_fn: Callable[..., Any] | None = None, stop_on_failure: bool = True)
Bases: AsyncBasePattern
Run async patterns sequentially, piping each result's answer to the next.
Each step awaits the previous one — use run_concurrent() if you need
independent patterns to run in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
patterns
|
list[AsyncBasePattern | BasePattern]
|
Ordered list of |
required |
transform_fn
|
Callable[..., Any] | None
|
|
None
|
stop_on_failure
|
bool
|
Stop if any step returns |
True
|
Source code in brain/patterns/async_patterns.py
run
async
¶
Execute the chain sequentially, awaiting each step.
Returns:
| Type | Description |
|---|---|
PatternResult
|
PatternResult from the final pattern, with |
Source code in brain/patterns/async_patterns.py
brain.patterns.async_patterns.AsyncPatternRouter
¶
AsyncPatternRouter(routes: dict[str, AsyncBasePattern | BasePattern], route_fn: Callable[..., Any], default: str | None = None)
Bases: AsyncBasePattern
Route a task to one of several async patterns based on a routing function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
routes
|
dict[str, AsyncBasePattern | BasePattern]
|
Dict mapping label → |
required |
route_fn
|
Callable[..., Any]
|
Sync or async callable |
required |
default
|
str | None
|
Fallback label if the route is unknown. |
None
|
Source code in brain/patterns/async_patterns.py
run
async
¶
Route and run asynchronously.
Source code in brain/patterns/async_patterns.py
Sync vs Async comparison¶
| Sync | Async | When to use async |
|---|---|---|
ReActAgent |
AsyncReActAgent |
FastAPI, async event loops |
RAGAgent |
AsyncRAGAgent |
Async DB/API retrievers |
ReflexionAgent |
AsyncReflexionAgent |
Async pipelines |
PatternChain |
AsyncPatternChain |
Sequential async steps |
PatternRouter |
AsyncPatternRouter |
Async routing logic |
MultiAgentOrchestrator |
run_concurrent() |
True parallel execution |