Skip to content

Streaming ReAct Agent

StreamingReActAgent wraps the ReAct pattern with a generator interface. Instead of blocking until a final answer, it yields PatternEvent objects as the agent reasons and acts — ideal for real-time UIs, progress bars, and streaming APIs.

Usage

from brain.patterns import StreamingReActAgent, EventKind

agent = StreamingReActAgent(
    tools={"search": lambda q: f"[result: {q}]"},
    max_steps=6,
)

for event in agent.stream("What is the state of AI safety research?"):
    if event.kind == EventKind.THOUGHT:
        print(f"  thinking... {event.text}")
    elif event.kind == EventKind.ACTION:
        print(f"  calling {event.action}({event.action_input})")
    elif event.kind == EventKind.OBSERVATION:
        print(f"  observed: {event.text}")
    elif event.kind == EventKind.FINAL:
        print(f"\nAnswer: {event.text}")
    elif event.kind == EventKind.ERROR:
        print(f"Error: {event.text}")

Blocking interface

run() aggregates the stream into a PatternResult — same interface as ReActAgent:

result = agent.run("What is RAG?")
print(result.answer)
print(result.metadata["streaming"])   # True

API Reference

brain.patterns.streaming.StreamingReActAgent

StreamingReActAgent(tools: dict[str, Tool] | None = None, provider: LLMProvider | None = None, max_steps: int = 8, system_prompt: str | None = None)

Bases: BasePattern

ReAct agent with a streaming generator interface.

Emits PatternEvent objects for each reasoning step so UIs and pipelines can react in real time rather than waiting for the full answer.

Parameters:

Name Type Description Default
tools dict[str, Tool] | None

Mapping of tool name → callable(str) -> str.

None
provider LLMProvider | None

LLMProvider instance; if None, uses LocalEchoProvider (offline-safe).

None
max_steps int

Maximum Thought/Action/Observation cycles before giving up.

8
system_prompt str | None

Override the default ReAct system prompt.

None
Source code in brain/patterns/streaming.py
def __init__(
    self,
    tools: dict[str, Tool] | None = None,
    provider: LLMProvider | None = None,
    max_steps: int = 8,
    system_prompt: str | None = None,
) -> None:
    self.tools: dict[str, Tool] = tools or {}
    self.max_steps = max_steps
    self._system_prompt = system_prompt
    self._provider = provider

stream

stream(task: str, **kwargs: Any) -> Iterator[PatternEvent]

Run the agent and yield PatternEvent objects for each step.

The generator always ends with either an EventKind.FINAL or an EventKind.ERROR event. Callers can use next(), for loops, or collect via list(agent.stream(task)).

Parameters:

Name Type Description Default
task str

The question or instruction for the agent.

required
**kwargs Any

Ignored; present for interface compatibility.

{}

Yields:

Type Description
PatternEvent

PatternEvent — one per thought, action, observation, and final answer.

Source code in brain/patterns/streaming.py
def stream(self, task: str, **kwargs: Any) -> Iterator[PatternEvent]:
    """Run the agent and yield PatternEvent objects for each step.

    The generator always ends with either an ``EventKind.FINAL`` or an
    ``EventKind.ERROR`` event. Callers can use ``next()``, ``for`` loops,
    or collect via ``list(agent.stream(task))``.

    Args:
        task: The question or instruction for the agent.
        **kwargs: Ignored; present for interface compatibility.

    Yields:
        PatternEvent — one per thought, action, observation, and final answer.
    """
    provider = self._get_provider()
    tool_names = ", ".join(self.tools.keys()) if self.tools else "none"
    system = (self._system_prompt or _SYSTEM_PROMPT).format(tool_names=tool_names)

    messages: list[dict[str, Any]] = [
        {"role": "system", "content": system},
        {"role": "user", "content": task},
    ]

    for i in range(self.max_steps):
        yield PatternEvent(kind=EventKind.STEP_START, step_index=i)

        try:
            result = provider.generate(messages, tools=[], tool_choice="none")
        except Exception as exc:  # noqa: BLE001
            logger.warning("StreamingReActAgent: provider error at step %d: %s", i, exc)
            yield PatternEvent(kind=EventKind.ERROR, text=str(exc), step_index=i)
            return

        text = result.text.strip()
        thought, action, action_input, final = _parse(text)

        if final is not None:
            yield PatternEvent(kind=EventKind.THOUGHT, text=thought, step_index=i)
            yield PatternEvent(kind=EventKind.FINAL, text=final, step_index=i)
            return

        if thought:
            yield PatternEvent(kind=EventKind.THOUGHT, text=thought, step_index=i)

        if action and action in self.tools:
            yield PatternEvent(
                kind=EventKind.ACTION,
                action=action,
                action_input=action_input,
                step_index=i,
            )
            try:
                observation = str(self.tools[action](action_input))
            except Exception as exc:  # noqa: BLE001
                observation = f"Tool error: {exc}"
            yield PatternEvent(kind=EventKind.OBSERVATION, text=observation, step_index=i)
            messages.append({"role": "assistant", "content": text})
            messages.append({"role": "user", "content": f"Observation: {observation}"})

        elif action:
            # Unknown tool
            observation = f"Unknown tool '{action}'. Available: {tool_names}"
            yield PatternEvent(
                kind=EventKind.ACTION,
                action=action,
                action_input=action_input,
                step_index=i,
            )
            yield PatternEvent(kind=EventKind.OBSERVATION, text=observation, step_index=i)
            messages.append({"role": "assistant", "content": text})
            messages.append({"role": "user", "content": f"Observation: {observation}"})

        else:
            # Plain response — treat as final
            yield PatternEvent(kind=EventKind.FINAL, text=text, step_index=i)
            return

    yield PatternEvent(
        kind=EventKind.ERROR,
        text="max_steps reached without Final Answer",
        step_index=self.max_steps - 1,
    )

run

run(task: str, **kwargs: Any) -> PatternResult

Blocking interface — collects all streaming events into a PatternResult.

Equivalent to ReActAgent.run() but internally uses the generator so the same logic is exercised in both streaming and blocking modes.

Parameters:

Name Type Description Default
task str

The question or instruction for the agent.

required

Returns:

Type Description
PatternResult

PatternResult with answer, steps, iterations, ok, error, metadata.

Source code in brain/patterns/streaming.py
def run(self, task: str, **kwargs: Any) -> PatternResult:
    """Blocking interface — collects all streaming events into a PatternResult.

    Equivalent to ``ReActAgent.run()`` but internally uses the generator
    so the same logic is exercised in both streaming and blocking modes.

    Args:
        task: The question or instruction for the agent.

    Returns:
        PatternResult with answer, steps, iterations, ok, error, metadata.
    """
    steps: list[Step] = []
    current: dict[str, Any] = {}
    final_answer = ""
    error: str | None = None
    iterations = 0

    for event in self.stream(task, **kwargs):
        if event.kind == EventKind.STEP_START:
            current = {
                "index": event.step_index,
                "thought": "",
                "action": "",
                "action_input": "",
                "observation": "",
                "text": "",
            }
            iterations = event.step_index + 1

        elif event.kind == EventKind.THOUGHT:
            current["thought"] = event.text

        elif event.kind == EventKind.ACTION:
            current["action"] = event.action
            current["action_input"] = event.action_input

        elif event.kind == EventKind.OBSERVATION:
            current["observation"] = event.text
            steps.append(
                Step(
                    index=current.get("index", len(steps)),
                    thought=current.get("thought", ""),
                    action=current.get("action", ""),
                    action_input=current.get("action_input", ""),
                    observation=event.text,
                )
            )

        elif event.kind == EventKind.FINAL:
            final_answer = event.text
            # Add a final step record if we had a thought
            if current.get("thought") and not any(
                s.index == current.get("index", -1) for s in steps
            ):
                steps.append(
                    Step(
                        index=current.get("index", len(steps)),
                        thought=current.get("thought", ""),
                        observation="",
                    )
                )

        elif event.kind == EventKind.ERROR:
            error = event.text

    ok = error is None
    return PatternResult(
        answer=final_answer,
        steps=steps,
        iterations=iterations,
        ok=ok,
        error=error,
        metadata={"streaming": True},
    )

brain.patterns.streaming.PatternEvent dataclass

PatternEvent(kind: EventKind, text: str = '', action: str = '', action_input: str = '', step_index: int = 0, metadata: dict[str, Any] = dict())

A single streaming event from StreamingReActAgent.

Attributes:

Name Type Description
kind EventKind

What kind of event this is (see EventKind).

text str

The text payload (thought, observation, answer, error message).

action str

Tool name (only set for ACTION events).

action_input str

Tool argument (only set for ACTION events).

step_index int

Which agent iteration this event belongs to (0-based).

metadata dict[str, Any]

Arbitrary extra data attached to the event.

brain.patterns.streaming.EventKind

Bases: str, Enum

Type tags for streaming events emitted by StreamingReActAgent.

Event sequence

For a typical two-step ReAct run:

STEP_START(0) → THOUGHT → ACTION → OBSERVATION →
STEP_START(1) → THOUGHT → FINAL

The generator always terminates with either FINAL or ERROR.

When to Use

Situation Recommendation
Streaming UI (chat, terminal) StreamingReActAgent
Progress callbacks needed StreamingReActAgent
Simple blocking call ReActAgent
Requires human approval HITLAgent