"""Anti-stall and adaptive re-plan monitoring for the Agent loop."""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from navi.llm.base import Message, ToolCallRequest
@dataclass
class AntiStallMonitor:
"""Tracks stall signals across iterations and builds intervention messages.
Two independent stall signals:
- No todo progress: consecutive iterations without a todo status change.
- Repeated tool calls: identical tool signatures across consecutive turns.
Also handles adaptive re-plan: when a todo step is newly marked failed,
a re-planning message is queued for injection on the next iteration.
"""
profile: object # AgentProfile — avoid circular import
stall_no_todo: int = 0
stall_repeat_tools: int = 0
prev_tool_sigs: frozenset = field(default_factory=frozenset)
known_failed: frozenset = field(default_factory=frozenset)
replan_msg: str | None = None
_todo_snapshot: frozenset | None = field(default=None, repr=False)
async def init(self, session_id: str) -> None:
"""Capture the initial todo snapshot so the first post_turn() can detect change."""
from navi.tools.todo import get_task_snapshot
self._todo_snapshot = await get_task_snapshot(session_id)
async def pre_turn(self, session_id: str, iteration: int) -> Message | None:
"""Return a system message to inject before the LLM call, or None."""
# Adaptive re-plan: inject queued message from previous iteration
if self.profile.adaptive_replan_enabled and self.replan_msg:
msg = self.replan_msg
self.replan_msg = None
return Message(role="system", content=msg)
# Anti-stall warning
if self.profile.anti_stall_enabled and iteration > 0:
stalled = (
self.stall_no_todo >= self.profile.anti_stall_threshold
or self.stall_repeat_tools >= self.profile.anti_stall_threshold
)
if stalled:
reason = (
f"no todo progress for {self.stall_no_todo} iterations"
if self.stall_no_todo >= self.profile.anti_stall_threshold
else f"identical tool calls repeated {self.stall_repeat_tools} times"
)
return Message(
role="system",
content=(
f"[Anti-stall warning — {reason}] "
"You are repeating the same actions without making progress. "
"Stop and reconsider: change your approach, try a different tool, "
"mark the current step as failed and move on, or ask the user for guidance."
),
)
return None
async def post_turn(self, session_id: str, tool_calls: list[ToolCallRequest]) -> None:
"""Update stall counters and adaptive-replan state after tool execution."""
from navi.tools.todo import get_failed_steps, get_task_snapshot
# --- Anti-stall: todo progress signal ---
if self.profile.anti_stall_enabled:
before = self._todo_snapshot
current = await get_task_snapshot(session_id)
if before is not None:
# Only count iterations toward a todo stall if a todo list actually exists.
# Profiles without the todo tool will have an empty snapshot forever.
if before and current != before:
self.stall_no_todo = 0
elif before:
self.stall_no_todo += 1
self._todo_snapshot = current
# Repeated tool call signal
cur_sigs = frozenset(
(tc.name, json.dumps(tc.arguments, sort_keys=True))
for tc in (tool_calls or [])
)
if cur_sigs and cur_sigs == self.prev_tool_sigs:
self.stall_repeat_tools += 1
else:
self.stall_repeat_tools = 0
self.prev_tool_sigs = cur_sigs
# --- Adaptive re-plan: detect newly-failed steps ---
if self.profile.adaptive_replan_enabled:
current_failed = await get_failed_steps(session_id)
new_failures = current_failed - self.known_failed
self.known_failed = current_failed
if new_failures:
import structlog
log = structlog.get_logger()
failed_labels = ", ".join(
f'step {idx} ("{text}")'
for idx, text in sorted(new_failures)
)
self.replan_msg = (
f"[Adaptive re-plan] {failed_labels} just failed. "
"Before continuing, revise your plan with the todo tool: either replace the remaining "
"pending steps or mark failed/skipped steps with validation. Then continue execution "
"with an approach that accounts for what went wrong."
)
log.info(
"agent.adaptive_replan_queued",
failures=len(new_failures),
session_id=session_id,
)