diff --git a/navi/core/agent.py b/navi/core/agent.py index 108095b..86ec4bd 100644 --- a/navi/core/agent.py +++ b/navi/core/agent.py @@ -30,6 +30,7 @@ from navi.tools._internal.base import Tool, current_event_sink, current_stop_event from .agent_run_context import AgentTurnContext +from .anti_stall import AntiStallMonitor from .compressor import ContextCompressor, should_compress from .context_builder import ContextBuilder from .planning import PlanningEngine @@ -168,22 +169,6 @@ _SUBAGENT_THINKING_STALL_CHARS = 12_000 -async def _todo_status_snapshot(session_id: str) -> frozenset[tuple[str, str]]: - """Return a frozenset of (task_text, status) for the current session's todo list. - - Used by the anti-stall detector to compare todo state before and after an - iteration — any status change means the model made real progress. - """ - from navi.tools.todo import get_task_snapshot - return await get_task_snapshot(session_id) - - -async def _todo_failed_steps(session_id: str) -> frozenset[tuple[int, str]]: - """Return a frozenset of (1-based index, task_text) for steps currently marked failed.""" - from navi.tools.todo import get_failed_steps - return await get_failed_steps(session_id) - - async def _todo_progress_message(session_id: str, *, first_iteration: bool = False) -> "Message | None": """Build a compact system reminder with current todo state and update discipline.""" from navi.tools.todo import get_progress_message @@ -754,6 +739,10 @@ else: log.debug("agent.memory_facts_none", session_id=session_id) + anti_stall = AntiStallMonitor(profile) + if profile.anti_stall_enabled or profile.adaptive_replan_enabled: + await anti_stall.init(session_id) + # Tool-calling loop — uses stream_complete() for every turn so thinking # is captured in real-time via ThinkingDelta/ThinkingEnd events. for iteration in range(profile.max_iterations): @@ -814,34 +803,10 @@ if todo_msg: built_ctx.append(todo_msg) - # Snapshot todo state before this iteration (for stall detection after) - _todo_snapshot_before = await _todo_status_snapshot(session_id) - - # Adaptive re-plan: inject queued re-plan message from previous iteration - if profile.adaptive_replan_enabled and turn_ctx.replan_msg: - built_ctx.append(Message(role="system", content=turn_ctx.replan_msg)) - turn_ctx.replan_msg = None - - if profile.anti_stall_enabled and iteration > 0: - stalled = ( - turn_ctx.stall_no_todo >= profile.anti_stall_threshold - or turn_ctx.stall_repeat_tools >= profile.anti_stall_threshold - ) - if stalled: - reason = ( - f"no todo progress for {turn_ctx.stall_no_todo} iterations" - if turn_ctx.stall_no_todo >= profile.anti_stall_threshold - else f"identical tool calls repeated {turn_ctx.stall_repeat_tools} times" - ) - built_ctx.append(Message( - role="system", - content=( - f"[Anti-stall warning — {reason}] " - "You are repeating the same actions without making progress. " - "Stop and reconsider: change your approach, try a different tool, " - "mark the current step as failed and move on, or ask the user for guidance." - ), - )) + # Anti-stall / adaptive re-plan: inject intervention message if needed + _stall_msg = await anti_stall.pre_turn(session_id, iteration) + if _stall_msg: + built_ctx.append(_stall_msg) try: self._check_context_size(built_ctx) @@ -1028,43 +993,8 @@ yield StreamStopped() return - # Update anti-stall counters after all tools in this iteration ran. - if profile.anti_stall_enabled: - # Todo progress signal - if (await _todo_status_snapshot(session_id)) != _todo_snapshot_before: - turn_ctx.stall_no_todo = 0 - else: - turn_ctx.stall_no_todo += 1 - - # Repeated tool call signal - cur_sigs = frozenset( - (tc.name, json.dumps(tc.arguments, sort_keys=True)) - for tc in (turn_tool_calls or []) - ) - if cur_sigs and cur_sigs == turn_ctx.prev_tool_sigs: - turn_ctx.stall_repeat_tools += 1 - else: - turn_ctx.stall_repeat_tools = 0 - turn_ctx.prev_tool_sigs = cur_sigs - - # Adaptive re-plan: detect steps that were newly marked failed this iteration. - if profile.adaptive_replan_enabled: - current_failed = await _todo_failed_steps(session_id) - new_failures = current_failed - turn_ctx.known_failed - turn_ctx.known_failed = current_failed - if new_failures: - failed_labels = ", ".join( - f'step {idx} ("{text}")' - for idx, text in sorted(new_failures) - ) - turn_ctx.replan_msg = ( - f"[Adaptive re-plan] {failed_labels} just failed. " - "Before continuing, revise your plan with the todo tool: either replace the remaining " - "pending steps or mark failed/skipped steps with validation. Then continue execution " - "with an approach that accounts for what went wrong." - ) - log.info("agent.adaptive_replan_queued", failures=len(new_failures), - session_id=session_id) + # Update anti-stall counters and adaptive-replan state + await anti_stall.post_turn(session_id, turn_tool_calls) # 7. If switch_profile was called this iteration, reload profile + tools. # switch_profile updates the DB but run_stream() holds a local session diff --git a/navi/core/agent_run_context.py b/navi/core/agent_run_context.py index 22f7f0c..ec956fd 100644 --- a/navi/core/agent_run_context.py +++ b/navi/core/agent_run_context.py @@ -22,9 +22,4 @@ tool_call_count: int = 0 turn_tokens: int = 0 subagent_tokens: int = 0 - stall_no_todo: int = 0 - stall_repeat_tools: int = 0 - prev_tool_sigs: frozenset = field(default_factory=frozenset) - known_failed: frozenset = field(default_factory=frozenset) - replan_msg: str | None = None injected_fact_ids: set[str] = field(default_factory=set) diff --git a/navi/core/anti_stall.py b/navi/core/anti_stall.py new file mode 100644 index 0000000..28537a7 --- /dev/null +++ b/navi/core/anti_stall.py @@ -0,0 +1,116 @@ +"""Anti-stall and adaptive re-plan monitoring for the Agent loop.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field + +from navi.llm.base import Message, ToolCallRequest + + +@dataclass +class AntiStallMonitor: + """Tracks stall signals across iterations and builds intervention messages. + + Two independent stall signals: + - No todo progress: consecutive iterations without a todo status change. + - Repeated tool calls: identical tool signatures across consecutive turns. + + Also handles adaptive re-plan: when a todo step is newly marked failed, + a re-planning message is queued for injection on the next iteration. + """ + + profile: object # AgentProfile — avoid circular import + stall_no_todo: int = 0 + stall_repeat_tools: int = 0 + prev_tool_sigs: frozenset = field(default_factory=frozenset) + known_failed: frozenset = field(default_factory=frozenset) + replan_msg: str | None = None + _todo_snapshot: frozenset | None = field(default=None, repr=False) + + async def init(self, session_id: str) -> None: + """Capture the initial todo snapshot so the first post_turn() can detect change.""" + from navi.tools.todo import get_task_snapshot + self._todo_snapshot = await get_task_snapshot(session_id) + + async def pre_turn(self, session_id: str, iteration: int) -> Message | None: + """Return a system message to inject before the LLM call, or None.""" + # Adaptive re-plan: inject queued message from previous iteration + if self.profile.adaptive_replan_enabled and self.replan_msg: + msg = self.replan_msg + self.replan_msg = None + return Message(role="system", content=msg) + + # Anti-stall warning + if self.profile.anti_stall_enabled and iteration > 0: + stalled = ( + self.stall_no_todo >= self.profile.anti_stall_threshold + or self.stall_repeat_tools >= self.profile.anti_stall_threshold + ) + if stalled: + reason = ( + f"no todo progress for {self.stall_no_todo} iterations" + if self.stall_no_todo >= self.profile.anti_stall_threshold + else f"identical tool calls repeated {self.stall_repeat_tools} times" + ) + return Message( + role="system", + content=( + f"[Anti-stall warning — {reason}] " + "You are repeating the same actions without making progress. " + "Stop and reconsider: change your approach, try a different tool, " + "mark the current step as failed and move on, or ask the user for guidance." + ), + ) + + return None + + async def post_turn(self, session_id: str, tool_calls: list[ToolCallRequest]) -> None: + """Update stall counters and adaptive-replan state after tool execution.""" + from navi.tools.todo import get_failed_steps, get_task_snapshot + + # --- Anti-stall: todo progress signal --- + if self.profile.anti_stall_enabled: + before = self._todo_snapshot + current = await get_task_snapshot(session_id) + if before is not None: + if current != before: + self.stall_no_todo = 0 + else: + self.stall_no_todo += 1 + self._todo_snapshot = current + + # Repeated tool call signal + cur_sigs = frozenset( + (tc.name, json.dumps(tc.arguments, sort_keys=True)) + for tc in (tool_calls or []) + ) + if cur_sigs and cur_sigs == self.prev_tool_sigs: + self.stall_repeat_tools += 1 + else: + self.stall_repeat_tools = 0 + self.prev_tool_sigs = cur_sigs + + # --- Adaptive re-plan: detect newly-failed steps --- + if self.profile.adaptive_replan_enabled: + current_failed = await get_failed_steps(session_id) + new_failures = current_failed - self.known_failed + self.known_failed = current_failed + if new_failures: + import structlog + log = structlog.get_logger() + failed_labels = ", ".join( + f'step {idx} ("{text}")' + for idx, text in sorted(new_failures) + ) + self.replan_msg = ( + f"[Adaptive re-plan] {failed_labels} just failed. " + "Before continuing, revise your plan with the todo tool: either replace the remaining " + "pending steps or mark failed/skipped steps with validation. Then continue execution " + "with an approach that accounts for what went wrong." + ) + log.info( + "agent.adaptive_replan_queued", + failures=len(new_failures), + session_id=session_id, + ) diff --git a/tests/unit/core/test_anti_stall.py b/tests/unit/core/test_anti_stall.py new file mode 100644 index 0000000..2fa3eba --- /dev/null +++ b/tests/unit/core/test_anti_stall.py @@ -0,0 +1,171 @@ +"""Unit tests for AntiStallMonitor.""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from navi.core.anti_stall import AntiStallMonitor +from navi.llm.base import Message, ToolCallRequest +from tests.conftest_factory import make_profile + + +class TestPreTurn: + @pytest.mark.asyncio + async def test_no_message_when_not_stalled(self): + profile = make_profile(anti_stall_enabled=True, anti_stall_threshold=8) + monitor = AntiStallMonitor(profile) + result = await monitor.pre_turn("s1", iteration=1) + assert result is None + + @pytest.mark.asyncio + async def test_anti_stall_no_todo(self): + profile = make_profile(anti_stall_enabled=True, anti_stall_threshold=3) + monitor = AntiStallMonitor(profile) + monitor.stall_no_todo = 3 + msg = await monitor.pre_turn("s1", iteration=1) + assert msg is not None + assert msg.role == "system" + assert "no todo progress for 3 iterations" in msg.content + assert "Anti-stall warning" in msg.content + + @pytest.mark.asyncio + async def test_anti_stall_repeat_tools(self): + profile = make_profile(anti_stall_enabled=True, anti_stall_threshold=3) + monitor = AntiStallMonitor(profile) + monitor.stall_repeat_tools = 3 + msg = await monitor.pre_turn("s1", iteration=1) + assert msg is not None + assert "identical tool calls repeated 3 times" in msg.content + + @pytest.mark.asyncio + async def test_anti_stall_skipped_on_first_iteration(self): + profile = make_profile(anti_stall_enabled=True, anti_stall_threshold=1) + monitor = AntiStallMonitor(profile) + monitor.stall_no_todo = 10 + msg = await monitor.pre_turn("s1", iteration=0) + assert msg is None + + @pytest.mark.asyncio + async def test_adaptive_replan_injected_before_anti_stall(self): + """When both replan_msg and stall are queued, replan takes precedence.""" + profile = make_profile( + anti_stall_enabled=True, + anti_stall_threshold=1, + adaptive_replan_enabled=True, + ) + monitor = AntiStallMonitor(profile) + monitor.replan_msg = "Please replan." + monitor.stall_no_todo = 5 + msg = await monitor.pre_turn("s1", iteration=1) + assert msg is not None + assert "Please replan." in msg.content + assert monitor.replan_msg is None # consumed + + @pytest.mark.asyncio + async def test_disabled_anti_stall_returns_none(self): + profile = make_profile(anti_stall_enabled=False, adaptive_replan_enabled=False) + monitor = AntiStallMonitor(profile) + monitor.stall_no_todo = 100 + msg = await monitor.pre_turn("s1", iteration=5) + assert msg is None + + +class TestPostTurn: + @pytest.mark.asyncio + async def test_todo_progress_resets_stall_counter(self): + profile = make_profile(anti_stall_enabled=True, anti_stall_threshold=8) + monitor = AntiStallMonitor(profile) + monitor.stall_no_todo = 5 + monitor._todo_snapshot = frozenset({("task1", "pending")}) + with patch( + "navi.tools.todo.get_task_snapshot", + new=AsyncMock(return_value=frozenset({("task1", "done")})), + ): + await monitor.post_turn("s1", []) + assert monitor.stall_no_todo == 0 + + @pytest.mark.asyncio + async def test_no_todo_progress_increments_stall_counter(self): + profile = make_profile(anti_stall_enabled=True, anti_stall_threshold=8) + monitor = AntiStallMonitor(profile) + snapshot = frozenset({("task1", "pending")}) + monitor._todo_snapshot = snapshot + with patch( + "navi.tools.todo.get_task_snapshot", + new=AsyncMock(return_value=snapshot), + ): + await monitor.post_turn("s1", []) + assert monitor.stall_no_todo == 1 + + @pytest.mark.asyncio + async def test_repeated_tool_calls_increment_repeat_counter(self): + profile = make_profile(anti_stall_enabled=True, anti_stall_threshold=8) + monitor = AntiStallMonitor(profile) + tc = ToolCallRequest(id="1", name="fs", arguments={"path": "/tmp"}) + # first call sets baseline + with patch( + "navi.tools.todo.get_task_snapshot", + new=AsyncMock(return_value=frozenset()), + ): + await monitor.post_turn("s1", [tc]) + assert monitor.stall_repeat_tools == 0 + # identical call again + with patch( + "navi.tools.todo.get_task_snapshot", + new=AsyncMock(return_value=frozenset()), + ): + await monitor.post_turn("s1", [tc]) + assert monitor.stall_repeat_tools == 1 + + @pytest.mark.asyncio + async def test_different_tool_calls_reset_repeat_counter(self): + profile = make_profile(anti_stall_enabled=True, anti_stall_threshold=8) + monitor = AntiStallMonitor(profile) + tc1 = ToolCallRequest(id="1", name="fs", arguments={"path": "/tmp"}) + tc2 = ToolCallRequest(id="2", name="fs", arguments={"path": "/other"}) + with patch( + "navi.tools.todo.get_task_snapshot", + new=AsyncMock(return_value=frozenset()), + ): + await monitor.post_turn("s1", [tc1]) + await monitor.post_turn("s1", [tc2]) + assert monitor.stall_repeat_tools == 0 + + @pytest.mark.asyncio + async def test_adaptive_replan_queues_message(self): + profile = make_profile(adaptive_replan_enabled=True) + monitor = AntiStallMonitor(profile) + with patch( + "navi.tools.todo.get_task_snapshot", + new=AsyncMock(return_value=frozenset()), + ), patch( + "navi.tools.todo.get_failed_steps", + new=AsyncMock(return_value=frozenset({(1, "step A")})), + ): + await monitor.post_turn("s1", []) + assert monitor.replan_msg is not None + assert "step 1" in monitor.replan_msg + assert "step A" in monitor.replan_msg + + @pytest.mark.asyncio + async def test_adaptive_replan_no_new_failures(self): + profile = make_profile(adaptive_replan_enabled=True) + monitor = AntiStallMonitor(profile) + monitor.known_failed = frozenset({(1, "step A")}) + with patch( + "navi.tools.todo.get_task_snapshot", + new=AsyncMock(return_value=frozenset()), + ), patch( + "navi.tools.todo.get_failed_steps", + new=AsyncMock(return_value=frozenset({(1, "step A")})), + ): + await monitor.post_turn("s1", []) + assert monitor.replan_msg is None + + @pytest.mark.asyncio + async def test_disabled_anti_stall_does_not_fetch_snapshot(self): + profile = make_profile(anti_stall_enabled=False, adaptive_replan_enabled=False) + monitor = AntiStallMonitor(profile) + with patch("navi.tools.todo.get_task_snapshot") as mock_snap: + await monitor.post_turn("s1", []) + mock_snap.assert_not_called()