diff --git a/navi/core/agent.py b/navi/core/agent.py index 60418da..108095b 100644 --- a/navi/core/agent.py +++ b/navi/core/agent.py @@ -29,6 +29,7 @@ from navi.llm.base import LLMBackend, LLMChunk, Message, ToolCallRequest from navi.tools._internal.base import Tool, current_event_sink, current_stop_event +from .agent_run_context import AgentTurnContext from .compressor import ContextCompressor, should_compress from .context_builder import ContextBuilder from .planning import PlanningEngine @@ -718,10 +719,7 @@ await self._sessions.save(session) stop_event = current_stop_event.get() - _turn_start = time.monotonic() - _tool_call_count = 0 - _subagent_tokens = 0 - _turn_tokens = 0 # accumulated tokens across all iterations of this turn + turn_ctx = AgentTurnContext(turn_start=time.monotonic()) # Planning phase — always runs on the first user message in a session; # on subsequent messages uses the profile's planning_enabled flag. @@ -732,7 +730,7 @@ if _is_first_message or profile.planning_enabled: async for _ev in self._planning.run(session.context, profile, llm, mem, tool_schemas, messages=session.messages, force_plan=_force_plan): if isinstance(_ev, AIHelperTokensUsed): - _subagent_tokens += _ev.total + turn_ctx.subagent_tokens += _ev.total elif isinstance(_ev, PlanningDebugData): session.planning_logs.append(_ev.log) # Cap to prevent unbounded DB growth on long sessions @@ -742,22 +740,10 @@ else: yield _ev - # Anti-stall state — tracks consecutive iterations without progress. - # Two independent signals: no todo status change, and repeated identical tool calls. - _stall_no_todo = 0 # iterations since last todo status change - _stall_repeat_tools = 0 # iterations with identical tool calls as the previous turn - _prev_tool_sigs: frozenset[tuple[str, str]] = frozenset() - - # Adaptive re-plan state — detect newly-failed todo steps and inject a - # re-planning prompt on the following iteration so the model revises its plan. - _known_failed: frozenset[tuple[int, str]] = frozenset() - _replan_msg: str | None = None - - _injected_fact_ids: set[str] = set() ctx_task = asyncio.create_task(self._ctx_builder._collect_context_injections(profile)) mem_facts_task = asyncio.create_task( self._ctx_builder._memory_facts_msg( - user_message, user_id=session.user_id, injected_ids=_injected_fact_ids + user_message, user_id=session.user_id, injected_ids=turn_ctx.injected_fact_ids ) ) ctx_injections = await ctx_task @@ -832,20 +818,20 @@ _todo_snapshot_before = await _todo_status_snapshot(session_id) # Adaptive re-plan: inject queued re-plan message from previous iteration - if profile.adaptive_replan_enabled and _replan_msg: - built_ctx.append(Message(role="system", content=_replan_msg)) - _replan_msg = None + if profile.adaptive_replan_enabled and turn_ctx.replan_msg: + built_ctx.append(Message(role="system", content=turn_ctx.replan_msg)) + turn_ctx.replan_msg = None if profile.anti_stall_enabled and iteration > 0: stalled = ( - _stall_no_todo >= profile.anti_stall_threshold - or _stall_repeat_tools >= profile.anti_stall_threshold + turn_ctx.stall_no_todo >= profile.anti_stall_threshold + or turn_ctx.stall_repeat_tools >= profile.anti_stall_threshold ) if stalled: reason = ( - f"no todo progress for {_stall_no_todo} iterations" - if _stall_no_todo >= profile.anti_stall_threshold - else f"identical tool calls repeated {_stall_repeat_tools} times" + f"no todo progress for {turn_ctx.stall_no_todo} iterations" + if turn_ctx.stall_no_todo >= profile.anti_stall_threshold + else f"identical tool calls repeated {turn_ctx.stall_repeat_tools} times" ) built_ctx.append(Message( role="system", @@ -894,7 +880,7 @@ break if chunk.prompt_tokens is not None or chunk.completion_tokens is not None: _iter_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0) - _turn_tokens += _iter_tokens + turn_ctx.turn_tokens += _iter_tokens context_tokens = _iter_tokens if chunk.thinking: accumulated_thinking += chunk.thinking @@ -926,16 +912,16 @@ if not turn_tool_calls: # Final response — text already streamed above - _elapsed = round(time.monotonic() - _turn_start, 1) + _elapsed = round(time.monotonic() - turn_ctx.turn_start, 1) # Net tokens = accumulated across all iterations + subagent tokens - _net_tokens = _turn_tokens + _subagent_tokens + _net_tokens = turn_ctx.turn_tokens + turn_ctx.subagent_tokens assistant_msg = Message( role="assistant", content=accumulated_text or None, thinking=accumulated_thinking or None, created_at=datetime.now(timezone.utc), elapsed_seconds=_elapsed, - tool_call_count=_tool_call_count if _tool_call_count else None, + tool_call_count=turn_ctx.tool_call_count if turn_ctx.tool_call_count else None, token_count=_net_tokens if _net_tokens else None, ) session.messages.append(assistant_msg) @@ -948,7 +934,7 @@ context_tokens=context_tokens, max_context_tokens=settings.ollama_num_ctx, elapsed_seconds=_elapsed, - tool_call_count=_tool_call_count, + tool_call_count=turn_ctx.tool_call_count, token_count=_net_tokens if _net_tokens else None, ) @@ -997,10 +983,10 @@ if item is _TOOL_DONE: break if isinstance(item, SubagentComplete): - _subagent_tokens += item.token_count - _tool_call_count += item.tool_call_count + turn_ctx.subagent_tokens += item.token_count + turn_ctx.tool_call_count += item.tool_call_count elif isinstance(item, AIHelperTokensUsed): - _subagent_tokens += item.total + turn_ctx.subagent_tokens += item.total else: yield item @@ -1022,7 +1008,7 @@ tool_event, msg, image_msg = r # 5. Yield the completed ToolEvent and record in session - _tool_call_count += 1 + turn_ctx.tool_call_count += 1 yield tool_event session.messages.append(msg) session.context.append(msg) @@ -1046,32 +1032,32 @@ if profile.anti_stall_enabled: # Todo progress signal if (await _todo_status_snapshot(session_id)) != _todo_snapshot_before: - _stall_no_todo = 0 + turn_ctx.stall_no_todo = 0 else: - _stall_no_todo += 1 + turn_ctx.stall_no_todo += 1 # Repeated tool call signal cur_sigs = frozenset( (tc.name, json.dumps(tc.arguments, sort_keys=True)) for tc in (turn_tool_calls or []) ) - if cur_sigs and cur_sigs == _prev_tool_sigs: - _stall_repeat_tools += 1 + if cur_sigs and cur_sigs == turn_ctx.prev_tool_sigs: + turn_ctx.stall_repeat_tools += 1 else: - _stall_repeat_tools = 0 - _prev_tool_sigs = cur_sigs + turn_ctx.stall_repeat_tools = 0 + turn_ctx.prev_tool_sigs = cur_sigs # Adaptive re-plan: detect steps that were newly marked failed this iteration. if profile.adaptive_replan_enabled: current_failed = await _todo_failed_steps(session_id) - new_failures = current_failed - _known_failed - _known_failed = current_failed + new_failures = current_failed - turn_ctx.known_failed + turn_ctx.known_failed = current_failed if new_failures: failed_labels = ", ".join( f'step {idx} ("{text}")' for idx, text in sorted(new_failures) ) - _replan_msg = ( + turn_ctx.replan_msg = ( f"[Adaptive re-plan] {failed_labels} just failed. " "Before continuing, revise your plan with the todo tool: either replace the remaining " "pending steps or mark failed/skipped steps with validation. Then continue execution " diff --git a/navi/core/agent_run_context.py b/navi/core/agent_run_context.py new file mode 100644 index 0000000..22f7f0c --- /dev/null +++ b/navi/core/agent_run_context.py @@ -0,0 +1,30 @@ +"""Turn-level execution state for the Agent streaming loop.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + pass + + +@dataclass +class AgentTurnContext: + """Mutable container for state that lives across iterations of a single run_stream() turn. + + Extracting these variables from run_stream() makes the method readable and lets + services (AntiStallMonitor, Compressor) receive the turn state explicitly instead + of returning tuples. + """ + + turn_start: float + tool_call_count: int = 0 + turn_tokens: int = 0 + subagent_tokens: int = 0 + stall_no_todo: int = 0 + stall_repeat_tools: int = 0 + prev_tool_sigs: frozenset = field(default_factory=frozenset) + known_failed: frozenset = field(default_factory=frozenset) + replan_msg: str | None = None + injected_fact_ids: set[str] = field(default_factory=set)