Agent Harness — A Turn End-to-End¶
This document traces every stage in AgentHarness.run_turn(), in execution order.
Stage 0 — Turn Start¶
self._thinking.on_turn_start()
self.callbacks.on_turn_start(session_id, self._thinking.turn_count)
self.event_log.log_chat_message(session_id, "user", user_message, ...)
ThinkingManager.on_turn_start()increments the internal turn counter.- The
on_turn_startcallback fires (e.g. to display a spinner in the UI). - The user message is written to the event log.
Stage 1 — Parallel Memory + Context Load¶
with ThreadPoolExecutor(max_workers=2):
memory_future = pool.submit(_load_memory_context, user_message, session_id)
context_future = pool.submit(tools.run, "get_context_pack", {...})
Two independent I/O operations run concurrently:
| Future | What it does |
|---|---|
memory_future |
Calls MemoryRetriever.retrieve_for_context() — pulls episodic memories matching the query. Extracts preference / habit items into a list. |
context_future |
Runs get_context_pack tool — queries the vault vector store for relevant facts. |
After both complete, any user preferences extracted from memory are merged into
the context pack's preferences field (deduped with dict.fromkeys).
Stage 2 — Resource Envelope¶
budget = TurnBudget.create(timeout_s=self.timeout_s, max_steps=self.max_steps)
state = TurnRuntimeState(session_id=session_id, budget=budget)
TurnBudgetsets an absolute wall-clock deadline (time.perf_counter() + timeout_s).- Default limits:
max_steps=6,max_tool_calls=6,max_reflections=4. TurnRuntimeStateis the single mutable container for this turn.- All downstream collaborators write to it through explicit parameters.
Stage 3 — Message Assembly¶
prep = self._turn_preparer.prepare(
session_id, history, user_message,
context_pack, memory_context_text,
runtime_provider, runtime_model,
skill_context, system_prompt_override,
)
TurnPreparer.prepare() builds the messages list in this order:
{"role": "system", "content": system_prompt_override}— base + thinking prefix{"role": "system", "content": runtime_metadata}— session_id, provider, model, today's date{"role": "system", "content": memory_context_text}← only if non-empty- History messages (role/content pairs from prior turns)
{"role": "system", "content": untrusted_context_block(context_pack)}— prompt-hardened vault context{"role": "system", "content": skill_context}← only if provided{"role": "user", "content": user_message}
The render hash is computed fresh from system_prompt_override (not sticky from
the previous turn).
Returns a frozen TurnPreparation dataclass.
Stage 4 — Tool-Call Loop¶
The main loop is _execute_turn_loop_v2(). It runs up to budget.max_steps
iterations.
4a — Step Gate¶
claim_step() atomically increments a counter and returns False when
max_steps is exhausted. is_expired() checks the wall-clock deadline.
4b — Optional Tool Selection¶
If SemanticToolSelector is enabled and the registry has more tools than
max_tools_per_turn, the selector ranks tools by semantic similarity to the
query and trims the list.
4c — LLM Generation¶
result = self._generate_with_span(
messages, tool_schemas,
model_name=resolved_model,
timeout_s=max(10, int(budget.remaining_s())),
)
- Wrapped in an OTEL LLM span with prompt attributes.
- If
stream=Trueand the provider supportsgenerate_stream(), the_consume_stream()method collects chunks and fireson_stream_chunkcallbacks. - Token counts are accumulated into
state.add_tokens().
4d — Terminal Check¶
If the LLM returns text with no tool calls, the turn is complete.
4e — Wave Scheduling¶
waves = self._scheduler.schedule(
result.tool_calls, self.tools,
parallel_enabled=self._parallel_enabled
)
ToolScheduler partitions the calls:
| Rule | Wave type |
|---|---|
Single call, or parallel_enabled=False |
Sequential |
Multiple READ_ONLY calls with non-overlapping resource keys |
Parallel |
Any LOCAL_WRITE / NETWORK / DESTRUCTIVE call |
Sequential (one call per wave) |
READ_ONLY calls with overlapping resource keys |
Split into separate sequential waves |
4f — Wave Execution¶
Parallel wave:
with ThreadPoolExecutor(max_workers=len(wave.calls)) as pool:
futures = {pool.submit(_exec_call_v2, tc, ...): tc for tc in wave.calls}
for fut in as_completed(futures):
outcome = fut.result()
Sequential wave:
After each call, messages.append({"role": "tool", "tool_call_id": ..., "content": ...})
and _apply_outcome_to_state() extracts citations.
The exception handler guarantees that every ToolCall in result.tool_calls
gets a tool-role message even if the wave executor itself raises unexpectedly.
4g — Per-Call Execution Gates (_exec_call_v2)¶
Before dispatching to BoundedToolExecutor, seven gates run in order:
| Gate | Action on fail |
|---|---|
1. ReplayControl.should_skip() |
Return ToolDenied(reason="duplicate") |
| 2. Blocked tool check | Return ToolDenied(reason="blocked") |
3. on_pre_tool_use callback |
Return ToolDenied(reason="pre_hook") |
4. ToolArgValidator.validate() |
Return ToolDenied(reason="validation") |
5. BoundedToolExecutor.execute() |
Returns typed ToolOutcome |
6. on_post_tool_use callback |
Best-effort, exception swallowed |
7. ReplayControl.record_*() |
Update replay history |
4h — Reflection Step¶
if self._reflection_engine.should_reflect():
self._reflection_engine.reflect(messages, tool_results, budget)
ReflectionEngine.should_reflect() delegates to ThinkingManager:
- Returns True if ThinkLevel != OFF and (a tool error occurred, or it's
a periodic reflect interval).
- Claims a budget.claim_reflection() slot before calling the LLM.
- Always returns a ReflectionResult (stub on budget/deadline exhaustion).
4i — Context Compaction¶
If auto_compact=True and the context warning tracker signals a "critical"
token level, ContextCompactor.compact() summarises older messages and resets
the tracker. state.context_was_compacted = True is set for observability.
Stage 5 — Write Gate Close¶
Permanently sets _writes_open = False. Any background thread that completes
after this point (late-arriving tool execution) will find the gate closed and
suppress all side effects.
Stage 6 — Finalization¶
return self._finalizer.finalize(
session_id=session_id,
user_message=user_message,
final_text=state.final_text,
...
)
TurnFinalizer.finalize() executes in order:
- Citation append — if
show_citations=True, append local/web citation lists tofinal_text. - Usage callback — fire
on_usage(input_tokens, output_tokens). - Log assistant message — write to event log, capture
assistant_message_id. - Memory extraction — call
MemoryExtractor.extract_turn()(async-safe, exception-wrapped). - Research learner observation — call
AutonomousResearchLearner.observe_turn()(exception-wrapped). - Judge scheduling — if the prompt template has associated judges,
schedule_prompt_judges()queues async evaluation of faithfulness, relevance, etc. - Decision record — emit a
RuntimeDecisionRecordto the decision store classifying the strategy (direct_answer/retrieval_augmented/web_augmented/tool_assisted). - Turn end callback — fire
on_turn_end(session_id). - Return
TurnResult— the public result dataclass.
Error Paths¶
| Scenario | What happens |
|---|---|
| Tool raises an exception | Caught inside BoundedToolExecutor._run_tool_call(); returns ToolFailure |
Tool exceeds per_tool_s |
future.result(timeout=...) raises TimeoutError; returns ToolTimeout; background thread is cancelled via DeadlineToken |
| Background thread completes late | state.complete_trace() returns False; thread returns ToolTimeout(retryable=False) with no side effects |
| Turn deadline expires mid-loop | budget.is_expired() detected at step gate; state.final_text set to timeout message |
max_steps exhausted |
budget.claim_step() returns False; while loop's else clause sets fallback message |
| Reflection LLM call fails | Caught in ReflectionEngine; stub ReflectionResult returned; loop continues |
| Finalization step raises | Each sub-step in TurnFinalizer.finalize() is individually exception-wrapped; partial failures don't abort the turn result |