"""
Agent: the tool-calling loop.
Flow:
1. Receive user message, load session + profile
2. Build tool schemas from profile's enabled_tools
3. Loop (up to max_iterations):
a. Call LLM with session.context (may be compressed) + tool schemas
b. If finish_reason == "stop" -> stream final response
c. If finish_reason == "tool_calls" -> execute tools, append to both
session.messages (display history) and session.context (LLM context)
4. After StreamEnd: run workers sequentially (e.g. context compression)
session.messages — full display history, never compressed
session.context — what the LLM sees; workers may compress this
"""
import asyncio
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator
import structlog
from navi.config import settings
from navi.exceptions import ContextTooLargeError, LLMBackendError, LLMConnectionError, MaxIterationsReached, SessionNotFound
from navi.llm.base import LLMBackend, LLMChunk, Message, ToolCallRequest
from navi.tools.base import Tool, current_event_sink, current_stop_event
from .compressor import compress_context, should_compress
from .context_builder import ContextBuilder
from .planning import PlanningEngine
from .events import (
AgentEvent,
AIHelperTokensUsed,
ContextCompressed,
PlanningDebugData,
PlanningStatus,
PlanReady,
StreamEnd,
StreamStopped,
SubagentComplete,
TextDelta,
ThinkingDelta,
ThinkingEnd,
ToolEvent,
ToolStarted,
TurnThinking,
)
from .registry import BackendRegistry, ProfileRegistry, ToolRegistry
from .session import SessionStore
from .tool_executor import ToolExecutor
if TYPE_CHECKING:
from navi.context_providers._loader import ContextProviderRegistry
from navi.memory.store import MemoryStore
from navi.workers.base import Worker, WorkerContext
_USER_ENABLED_FILE = Path(settings.tools_dir) / "enabled.json"
async def _iter_stream_guarded(
stream_gen: "AsyncGenerator[LLMChunk, None]",
stop_event: "asyncio.Event | None",
first_chunk_timeout: float,
chunk_timeout: float,
) -> "AsyncGenerator[LLMChunk, None]":
"""
Wraps a streaming LLM generator with two safety mechanisms:
1. Stop-event responsiveness during prefill.
Normally, the agent only checks stop_event *between* chunks. During the
prefill phase (processing input tokens) Ollama emits no chunks at all —
the first await can block for minutes on large contexts. This wrapper polls
stop_event every second so the user's Stop button works even then.
2. Timeouts as a last-resort safety net.
first_chunk_timeout: how long to wait for the first token (prefill).
chunk_timeout: max gap between subsequent tokens.
On timeout the generator is closed, which terminates the HTTP connection
to Ollama → Ollama halts generation → GPU load drops to idle.
"""
first = True
chunk_task: asyncio.Task | None = None
try:
while True:
timeout = first_chunk_timeout if first else chunk_timeout
# Create one task per chunk; reuse across poll iterations so we
# don't accidentally start multiple concurrent __anext__ calls.
chunk_task = asyncio.ensure_future(stream_gen.__anext__())
elapsed = 0.0
while True:
done, _ = await asyncio.wait({chunk_task}, timeout=1.0)
if done:
break
elapsed += 1.0
if stop_event and stop_event.is_set():
chunk_task.cancel()
try:
await chunk_task
except (asyncio.CancelledError, Exception):
pass
chunk_task = None
return
if elapsed >= timeout:
chunk_task.cancel()
try:
await chunk_task
except (asyncio.CancelledError, Exception):
pass
chunk_task = None
label = "first token (context may be too large for this model)" if first else "next token"
raise LLMBackendError(
f"LLM timed out after {elapsed:.0f}s waiting for {label}."
)
try:
chunk = chunk_task.result()
except StopAsyncIteration:
chunk_task = None
return
chunk_task = None
first = False
yield chunk
if stop_event and stop_event.is_set():
return
finally:
# Cancel any in-flight __anext__ task so we don't leave a zombie
# coroutine holding an open HTTP connection to Ollama.
if chunk_task is not None and not chunk_task.done():
chunk_task.cancel()
try:
await chunk_task
except (asyncio.CancelledError, Exception):
pass
# Closing the generator terminates the HTTP connection to Ollama,
# which signals it to stop generating (GPU returns to idle).
try:
await stream_gen.aclose()
except Exception:
pass
def _load_user_enabled_tools() -> list[str]:
try:
return json.loads(_USER_ENABLED_FILE.read_text())
except Exception:
return []
log = structlog.get_logger()
# Sentinel: placed in the event sink by the tool wrapper to signal completion.
_TOOL_DONE = object()
# Sub-agents are execution workers. If a sub-agent produces only thinking for a
# long time without text or tool calls, local models can degenerate into endless
# internal-token loops and keep the GPU busy with no user-visible progress.
_SUBAGENT_THINKING_STALL_SECONDS = 60.0
_SUBAGENT_THINKING_STALL_CHARS = 12_000
def _todo_status_snapshot(session_id: str) -> frozenset[tuple[str, str]]:
"""Return a frozenset of (task_text, status) for the current session's todo list.
Used by the anti-stall detector to compare todo state before and after an
iteration — any status change means the model made real progress.
"""
from navi.tools.todo import get_task_snapshot
return get_task_snapshot(session_id)
def _todo_failed_steps(session_id: str) -> frozenset[tuple[int, str]]:
"""Return a frozenset of (1-based index, task_text) for steps currently marked failed."""
from navi.tools.todo import get_failed_steps
return get_failed_steps(session_id)
def _todo_progress_message(session_id: str, *, first_iteration: bool = False) -> "Message | None":
"""Build a compact system reminder with current todo state and update discipline."""
from navi.tools.todo import get_progress_message
return get_progress_message(session_id, first_iteration=first_iteration)
class Agent:
def __init__(
self,
session_store: "SessionStore | None",
profile_registry: ProfileRegistry,
tool_registry: ToolRegistry,
backend_registry: BackendRegistry,
workers: list["Worker"] | None = None,
memory_store: "MemoryStore | None" = None,
cp_registry: "ContextProviderRegistry | None" = None,
mcp_manager=None,
) -> None:
self._sessions = session_store
self._profiles = profile_registry
self._tools = tool_registry
self._backends = backend_registry
self._workers: list["Worker"] = workers or []
self._memory = memory_store
self._cp_registry = cp_registry
self._mcp_manager = mcp_manager
self._ctx_builder = ContextBuilder(
profile_registry=profile_registry,
memory_store=memory_store,
cp_registry=cp_registry,
mcp_manager=mcp_manager,
)
self._tool_executor = ToolExecutor(tool_registry)
self._planning = PlanningEngine(self._ctx_builder)
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
async def run(self, session_id: str, user_message: str, images: list[str] | None = None) -> str:
"""Non-streaming: run the full tool-calling loop and return the final text."""
session = await self._sessions.get(session_id)
if session is None:
raise SessionNotFound(session_id)
profile = self._profiles.get(session.profile_id)
tools = self._tool_list(profile.enabled_tools, profile.mcp_servers)
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
mem = await self._ctx_builder._memory_msg(user_id=session.user_id)
# Expose session_id to tools (e.g. SSH connection pool) via ContextVar
from navi.tools.base import current_session_id as _sid_var
_sid_var.set(session_id)
# current_user_id and current_user_role are set by the caller
# (websocket_session or messages endpoint) before run()/run_stream()
user_msg = Message(role="user", content=user_message, images=images or None,
created_at=datetime.now(timezone.utc))
session.messages.append(user_msg)
session.context.append(user_msg)
await self._sessions.save(session)
_injected_fact_ids: set[str] = set()
ctx_task = asyncio.create_task(self._ctx_builder._collect_context_injections(profile))
mem_facts_task = asyncio.create_task(
self._ctx_builder._memory_facts_msg(
user_message, user_id=session.user_id, injected_ids=_injected_fact_ids
)
)
ctx_injections = await ctx_task
mem_facts = await mem_facts_task
if mem_facts:
ctx_injections.append(mem_facts)
log.debug("agent.memory_facts_injected", session_id=session_id, facts_msg_length=len(mem_facts.content or ""))
else:
log.debug("agent.memory_facts_none", session_id=session_id)
for iteration in range(profile.max_iterations):
log.debug("agent.iteration", session_id=session_id, iteration=iteration)
response = await llm.complete(
self._ctx_builder.build(session.context, profile, mem,
iteration=iteration, max_iterations=profile.max_iterations,
extra_system=ctx_injections,
session_id=session_id),
tools=tool_schemas if tools else None,
temperature=profile.temperature,
model=profile.model,
think=profile.think_enabled,
top_k=profile.top_k,
top_p=profile.top_p,
num_thread=profile.num_thread,
)
if response.finish_reason == "stop" or not response.tool_calls:
content = response.content or ""
assistant_msg = Message(role="assistant", content=content,
created_at=datetime.now(timezone.utc))
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
await self._sessions.save(session)
return content
# Tool calls turn — append to both messages and context
assistant_msg = Message(
role="assistant",
content=response.content,
tool_calls=response.tool_calls,
)
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
tool_results, image_injections = await self._tool_executor._execute_tool_calls(response.tool_calls, tools)
session.messages.extend(tool_results)
session.context.extend(tool_results)
# Image injections are synthetic LLM helpers — context only
session.context.extend(image_injections)
await self._sessions.save(session)
raise MaxIterationsReached(profile.max_iterations)
async def run_ephemeral(
self,
user_message: str,
profile_id: str,
max_iterations: int = 40,
exclude_tools: list[str] | None = None,
briefing: str | None = None,
custom_system_prompt: str | None = None,
context_transfer: str | None = None,
parent_session_id: str | None = None,
timeout_seconds: float = 300.0,
) -> tuple[str, bool]:
"""
Run a sub-agent loop without a persistent session.
Returns (result_text, completed_normally).
completed_normally is False if the sub-agent hit the iteration limit or timed out.
Intended for spawning from tools (e.g. SpawnAgentTool).
No DB reads/writes — uses a temporary in-memory context.
Tools listed in exclude_tools are stripped from the tool list
(use this to prevent recursion: exclude 'spawn_agent').
System prompt structure (completely separate from the parent's system prompt):
1. profile.subagent_system_prompt — focused executor persona
2. custom_system_prompt — optional role specialisation for this task
3. briefing — task context (credentials, paths, instructions)
context_transfer: text from the parent's scratchpad context_transfer section,
injected as a priming exchange before the task message.
parent_session_id: parent chat session id. When provided, sub-agent
tool calls run in that session context so session-aware tools resolve
filenames against the user's session directory, not an ephemeral id.
timeout_seconds: wall-clock timeout for the entire sub-agent run.
"""
import time as _time
import uuid as _uuid
from navi.tools.base import (
current_session_id as _sid_var,
current_model as _model_var,
current_user_id as _uid_var,
current_user_role as _role_var,
current_user_info as _uinfo_var,
)
_prev_sid = _sid_var.get(None)
_prev_model = _model_var.get(None)
_prev_uid = _uid_var.get(None)
_prev_role = _role_var.get()
_prev_uinfo = _uinfo_var.get(None)
subagent_run_id = f"subagent_{_uuid.uuid4().hex[:12]}"
tool_session_id = parent_session_id or subagent_run_id
_sid_var.set(tool_session_id)
profile = self._profiles.get(profile_id)
_model_var.set(profile.model)
exclude = set(exclude_tools or [])
# Use dedicated subagent_tools if configured, else fall back to enabled_tools.
tool_source = profile.subagent_tools if profile.subagent_tools else profile.enabled_tools
tools = [t for t in self._tool_list(tool_source, profile.mcp_servers) if t.name not in exclude]
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
user_id = None
if parent_session_id and self._sessions:
parent_session = await self._sessions.get(parent_session_id)
if parent_session:
user_id = parent_session.user_id
if user_id is not None:
_uid_var.set(user_id)
_role_var.set(_prev_role or "user")
_uinfo_var.set(_prev_uinfo)
else:
_uid_var.set(None)
_role_var.set("user")
_uinfo_var.set(None)
mem = await self._ctx_builder._memory_msg(user_id=user_id)
# Build subagent system prompt — completely separate from the parent's system prompt.
# No persona, no orchestrator instructions, no profiles block.
# Structure: executor persona → role specialisation → task context (briefing)
sys_parts: list[str] = []
if profile.subagent_system_prompt:
sys_parts.append(profile.subagent_system_prompt)
if custom_system_prompt:
sys_parts.append(custom_system_prompt)
if briefing:
sys_parts.append(f"## Task context\n\n{briefing}")
if parent_session_id:
sys_parts.append(
"[Parent session context]\n"
f"Parent Session ID: {parent_session_id}\n"
f"Session files directory: {settings.session_files_dir}/{parent_session_id}/\n"
"For files the user should see, write to this exact session directory. "
"Do not use or invent a subagent_* directory."
)
if not sys_parts:
# Fallback if profile has no subagent_system_prompt defined
sys_parts.append(profile.system_prompt)
subagent_sys_msg = Message(role="system", content="\n\n---\n\n".join(sys_parts))
# Build initial context.
# If context_transfer is provided, inject it as a priming exchange so the
# sub-agent has the parent's working state from the start.
context: list[Message] = []
if context_transfer:
context.append(Message(
role="user",
content=f"## Context from parent agent\n\n{context_transfer}",
))
context.append(Message(
role="assistant",
content="Understood. I have the context. Ready to begin the task.",
))
context.append(Message(role="user", content=user_message, created_at=datetime.now(timezone.utc)))
# Read the event sink set by the parent run_stream() for this tool call.
# If None (e.g. called from run(), not run_stream()), events are silently dropped.
sink = current_event_sink.get()
log.info("agent.subagent.start", profile_id=profile_id, max_iterations=max_iterations,
tools=len(tools), planning=profile.subagent_planning_enabled)
stop_event = current_stop_event.get()
tool_map = {t.name: t for t in tools}
subagent_think = (
profile.subagent_think_enabled
if profile.subagent_think_enabled is not None
else profile.think_enabled
)
_sub_tokens: int = 0 # tokens from the final LLM call
_sub_tool_count: int = 0 # total tool calls across all iterations
_start_time = _time.monotonic()
accumulated_text = ""
try:
# ── Optional planning phase ────────────────────────────────────────────
if profile.subagent_planning_enabled:
async for _ev in self._planning.run(
context, profile, llm, mem, tool_schemas,
system_prompt_override=subagent_sys_msg.content,
is_subagent=True,
):
if isinstance(_ev, AIHelperTokensUsed):
pass # token accounting only, not forwarded
elif sink is not None:
await sink.put(_ev)
# ── Tool-calling loop ──────────────────────────────────────────────────
for iteration in range(max_iterations):
if stop_event and stop_event.is_set():
return accumulated_text, False
elapsed = _time.monotonic() - _start_time
if elapsed >= timeout_seconds:
log.warning("agent.subagent.timeout", elapsed=elapsed, timeout=timeout_seconds)
if sink is not None:
await sink.put(SubagentComplete(token_count=_sub_tokens, tool_call_count=_sub_tool_count))
return accumulated_text or "[Sub-agent timed out]", False
log.debug("agent.subagent.iteration", iteration=iteration)
accumulated_text = ""
accumulated_thinking = ""
turn_tool_calls: list[ToolCallRequest] | None = None
turn_tokens: int | None = None
thinking_started_at: float | None = None
thinking_stalled_reason: str | None = None
# Build context inline — no persona or profiles block for subagents.
built_ctx: list[Message] = [subagent_sys_msg]
if mem:
built_ctx.append(mem)
mcp_msg = self._ctx_builder._mcp_context_msg(profile)
if mcp_msg:
built_ctx.append(mcp_msg)
built_ctx.extend(m for m in context if m.role != "system")
self._check_context_size(built_ctx)
async for chunk in _iter_stream_guarded(
llm.stream_complete(
built_ctx,
tools=tool_schemas if tools else None,
temperature=profile.temperature,
model=profile.model,
think=subagent_think,
top_k=profile.top_k,
top_p=profile.top_p,
num_thread=profile.num_thread,
),
stop_event=stop_event,
first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
chunk_timeout=settings.llm_stream_chunk_timeout,
):
if chunk.prompt_tokens is not None or chunk.completion_tokens is not None:
turn_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0)
if chunk.thinking:
if thinking_started_at is None:
thinking_started_at = _time.monotonic()
accumulated_thinking += chunk.thinking
thinking_elapsed = _time.monotonic() - thinking_started_at
if (
thinking_elapsed >= _SUBAGENT_THINKING_STALL_SECONDS
or len(accumulated_thinking) >= _SUBAGENT_THINKING_STALL_CHARS
):
thinking_stalled_reason = (
"Sub-agent produced only thinking output for "
f"{thinking_elapsed:.0f}s / {len(accumulated_thinking)} chars "
"without text or tool calls."
)
log.warning(
"agent.subagent.thinking_stall",
elapsed=thinking_elapsed,
chars=len(accumulated_thinking),
profile_id=profile_id,
)
break
if chunk.delta:
accumulated_text += chunk.delta
if chunk.tool_calls:
turn_tool_calls = chunk.tool_calls
if stop_event and stop_event.is_set():
return accumulated_text, False
if thinking_stalled_reason:
if sink is not None:
await sink.put(SubagentComplete(
token_count=turn_tokens or 0,
tool_call_count=_sub_tool_count,
))
return f"[{thinking_stalled_reason}]", False
if not turn_tool_calls:
log.info("agent.subagent.complete", iterations=iteration + 1,
result_len=len(accumulated_text))
_sub_tokens = turn_tokens or 0
if sink is not None:
await sink.put(SubagentComplete(
token_count=_sub_tokens,
tool_call_count=_sub_tool_count,
))
return accumulated_text, True
# Emit accumulated thinking before tool calls
if accumulated_thinking and sink is not None:
log.debug("agent.subagent.turn_thinking", length=len(accumulated_thinking))
await sink.put(TurnThinking(thinking=accumulated_thinking, is_subagent=True))
context.append(Message(
role="assistant",
content=accumulated_text or None,
tool_calls=turn_tool_calls,
))
# Execute each tool call sequentially, emitting events to parent sink
for tc in turn_tool_calls:
_sub_tool_count += 1
if sink is not None:
await sink.put(ToolStarted(
tool_name=tc.name, arguments=tc.arguments, is_subagent=True
))
tool = tool_map.get(tc.name)
image_msg = None
metadata: dict = {}
if tool is None:
content = f"Error: tool '{tc.name}' not found."
success = False
else:
log.info("tool.execute.subagent", tool=tc.name, args=tc.arguments)
try:
result = await tool.execute(tc.arguments)
content = result.to_message_content()
success = result.success
metadata = result.metadata or {}
if result.success and result.metadata and result.metadata.get("is_image"):
b64 = result.metadata.get("base64")
if b64:
image_msg = Message(
role="user",
content=f"[Image loaded via {tc.name} — analyse it]",
images=[b64],
)
except Exception as exc:
log.warning("agent.subagent.tool_exception", tool=tc.name, error=str(exc))
content = f"Error: {exc}"
success = False
metadata = {}
if sink is not None:
await sink.put(ToolEvent(
tool_name=tc.name, arguments=tc.arguments,
result=content, success=success, is_subagent=True,
))
context.append(Message(role="tool", content=content,
tool_call_id=tc.id, name=tc.name, metadata=metadata))
if image_msg:
context.append(image_msg)
log.warning("agent.subagent.max_iterations", max_iterations=max_iterations)
if sink is not None:
await sink.put(SubagentComplete(token_count=_sub_tokens, tool_call_count=_sub_tool_count))
return accumulated_text or "[Sub-agent reached iteration limit without a final answer]", False
finally:
# Restore parent ContextVar values so background tasks don't inherit stale subagent IDs
_sid_var.set(_prev_sid)
_model_var.set(_prev_model)
_uid_var.set(_prev_uid)
_role_var.set(_prev_role)
_uinfo_var.set(_prev_uinfo)
async def run_stream(
self,
session_id: str,
user_message: str,
images: list[str] | None = None,
display_message: str | None = None,
) -> AsyncGenerator[AgentEvent, None]:
"""
Streaming variant. Yields AgentEvent objects:
- ThinkingDelta / ThinkingEnd: reasoning chunks
- ToolEvent: tool call + result
- TextDelta / StreamEnd: final streamed response
- ContextCompressed: emitted by workers after compression
Args:
user_message: The text sent to the LLM (may contain injected hints).
display_message: Optional text shown to the user in the chat UI.
When omitted, user_message is used for both LLM context and display.
"""
session = await self._sessions.get(session_id)
if session is None:
raise SessionNotFound(session_id)
profile = self._profiles.get(session.profile_id)
tools = self._tool_list(profile.enabled_tools, profile.mcp_servers)
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
mem = await self._ctx_builder._memory_msg(user_id=session.user_id)
# Expose session_id and model to tools via ContextVar
from navi.tools.base import current_session_id as _sid_var, current_model as _model_var
_sid_token = _sid_var.set(session_id)
_model_var.set(profile.model)
# Pre-turn compression: if the last turn filled the context past the
# threshold, compress NOW before calling the LLM. This prevents the
# model from seeing an over-full context and generating gibberish
# (e.g. a "summary of the conversation" instead of a real answer).
if (
settings.context_compression_enabled
and session.context_token_count > 0
and should_compress(
session.context_token_count,
settings.ollama_num_ctx,
settings.context_compression_threshold,
)
):
event = await self._compress_session_context(
session=session,
llm=llm,
model=profile.model,
session_id=session_id,
reason="preturn",
)
if event:
yield event
display_text = display_message if display_message is not None else user_message
user_msg_display = Message(role="user", content=display_text, images=images or None,
created_at=datetime.now(timezone.utc))
user_msg_context = Message(role="user", content=user_message, images=images or None,
created_at=datetime.now(timezone.utc))
session.messages.append(user_msg_display)
session.context.append(user_msg_context)
# Persist user message immediately so it survives a client disconnect
# before the assistant reply is ready.
await self._sessions.save(session)
stop_event = current_stop_event.get()
_turn_start = time.monotonic()
_tool_call_count = 0
_subagent_tokens = 0
_prev_tokens = session.context_token_count # token baseline before this turn
# Planning phase — always runs on the first user message in a session;
# on subsequent messages uses the profile's planning_enabled flag.
# force_plan suppresses the DIRECT shortcut: first message is always forced,
# and planning_mandatory extends that to every subsequent message.
_is_first_message = sum(1 for m in session.messages if m.role == "user") == 1
_force_plan = _is_first_message or profile.planning_mandatory
if _is_first_message or profile.planning_enabled:
async for _ev in self._planning.run(session.context, profile, llm, mem, tool_schemas, messages=session.messages, force_plan=_force_plan):
if isinstance(_ev, AIHelperTokensUsed):
_subagent_tokens += _ev.total
elif isinstance(_ev, PlanningDebugData):
session.planning_logs.append(_ev.log)
# Cap to prevent unbounded DB growth on long sessions
_MAX_PLANNING_LOGS = 20
if len(session.planning_logs) > _MAX_PLANNING_LOGS:
session.planning_logs = session.planning_logs[-_MAX_PLANNING_LOGS:]
else:
yield _ev
# Anti-stall state — tracks consecutive iterations without progress.
# Two independent signals: no todo status change, and repeated identical tool calls.
_stall_no_todo = 0 # iterations since last todo status change
_stall_repeat_tools = 0 # iterations with identical tool calls as the previous turn
_prev_tool_sigs: frozenset[tuple[str, str]] = frozenset()
# Adaptive re-plan state — detect newly-failed todo steps and inject a
# re-planning prompt on the following iteration so the model revises its plan.
_known_failed: frozenset[tuple[int, str]] = frozenset()
_replan_msg: str | None = None
_injected_fact_ids: set[str] = set()
ctx_task = asyncio.create_task(self._ctx_builder._collect_context_injections(profile))
mem_facts_task = asyncio.create_task(
self._ctx_builder._memory_facts_msg(
user_message, user_id=session.user_id, injected_ids=_injected_fact_ids
)
)
ctx_injections = await ctx_task
mem_facts = await mem_facts_task
if mem_facts:
ctx_injections.append(mem_facts)
log.debug("agent.memory_facts_injected", session_id=session_id, facts_msg_length=len(mem_facts.content or ""))
else:
log.debug("agent.memory_facts_none", session_id=session_id)
# Tool-calling loop — uses stream_complete() for every turn so thinking
# is captured in real-time via ThinkingDelta/ThinkingEnd events.
for iteration in range(profile.max_iterations):
# Cooperative stop: check before each LLM call
if stop_event and stop_event.is_set():
await self._sessions.save(session)
yield StreamStopped()
return
if settings.context_compression_enabled and iteration > 0:
preflight_ctx = self._ctx_builder.build(
session.context,
profile,
mem,
extra_system=ctx_injections,
session_id=session_id,
)
estimated_tokens = self._estimate_context_tokens(preflight_ctx)
if should_compress(
estimated_tokens,
settings.ollama_num_ctx,
settings.context_compression_threshold,
):
event = await self._compress_session_context(
session=session,
llm=llm,
model=profile.model,
session_id=session_id,
reason="midturn",
keep_recent_messages=max(12, settings.context_keep_recent * 2),
)
if event:
yield event
_prev_tokens = 0
accumulated_text = ""
accumulated_thinking = ""
turn_tool_calls: list[ToolCallRequest] | None = None
thinking_active = False
context_tokens: int | None = None
built_ctx = self._ctx_builder.build(session.context, profile, mem,
iteration=iteration, max_iterations=profile.max_iterations,
extra_system=ctx_injections,
session_id=session_id)
if (
profile.goal_anchoring_enabled
and iteration > 0
and iteration % profile.goal_anchoring_interval == 0
):
built_ctx.append(self._ctx_builder._build_goal_anchor(session_id, user_message))
todo_msg = _todo_progress_message(session_id, first_iteration=(iteration == 0))
if todo_msg:
built_ctx.append(todo_msg)
# Snapshot todo state before this iteration (for stall detection after)
_todo_snapshot_before = _todo_status_snapshot(session_id)
# Adaptive re-plan: inject queued re-plan message from previous iteration
if profile.adaptive_replan_enabled and _replan_msg:
built_ctx.append(Message(role="system", content=_replan_msg))
_replan_msg = None
if profile.anti_stall_enabled and iteration > 0:
stalled = (
_stall_no_todo >= profile.anti_stall_threshold
or _stall_repeat_tools >= profile.anti_stall_threshold
)
if stalled:
reason = (
f"no todo progress for {_stall_no_todo} iterations"
if _stall_no_todo >= profile.anti_stall_threshold
else f"identical tool calls repeated {_stall_repeat_tools} times"
)
built_ctx.append(Message(
role="system",
content=(
f"[Anti-stall warning — {reason}] "
"You are repeating the same actions without making progress. "
"Stop and reconsider: change your approach, try a different tool, "
"mark the current step as failed and move on, or ask the user for guidance."
),
))
try:
self._check_context_size(built_ctx)
except ContextTooLargeError as e:
# Surface the error as a Navi response (not a raw system error) so the
# user sees a coherent message and the exchange is saved to history.
error_text = str(e)
assistant_msg = Message(role="assistant", content=error_text)
session.context.append(assistant_msg)
session.messages.append(assistant_msg)
await self._sessions.save(session)
yield TextDelta(delta=error_text)
yield StreamEnd(content=error_text)
return
async for chunk in _iter_stream_guarded(
llm.stream_complete(
built_ctx,
tools=tool_schemas if tools else None,
temperature=profile.temperature,
model=profile.model,
think=profile.think_enabled,
top_k=profile.top_k,
top_p=profile.top_p,
num_thread=profile.num_thread,
),
stop_event=stop_event,
first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
chunk_timeout=settings.llm_stream_chunk_timeout,
):
# Cooperative stop: break cleanly so aclose() is called on the
# generator → Ollama stream closes gracefully, model stays in VRAM.
if stop_event and stop_event.is_set():
if thinking_active:
yield ThinkingEnd()
break
if chunk.prompt_tokens is not None or chunk.completion_tokens is not None:
context_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0)
if chunk.thinking:
accumulated_thinking += chunk.thinking
if not thinking_active:
thinking_active = True
yield ThinkingDelta(delta=chunk.thinking)
elif chunk.delta:
if thinking_active:
thinking_active = False
yield ThinkingEnd()
accumulated_text += chunk.delta
yield TextDelta(delta=chunk.delta)
if chunk.tool_calls:
turn_tool_calls = chunk.tool_calls
if chunk.finish_reason and thinking_active:
thinking_active = False
yield ThinkingEnd()
# Stopped mid-stream — save partial response and exit
if stop_event and stop_event.is_set():
if accumulated_text:
session.messages.append(Message(
role="assistant", content=accumulated_text,
created_at=datetime.now(timezone.utc),
))
await self._sessions.save(session)
yield StreamStopped()
return
if not turn_tool_calls:
# Final response — text already streamed above
_elapsed = round(time.monotonic() - _turn_start, 1)
# Net tokens = marginal cost of this turn (delta from baseline) + subagent tokens
_net_tokens = max(0, (context_tokens or 0) - _prev_tokens) + _subagent_tokens
assistant_msg = Message(
role="assistant",
content=accumulated_text or None,
thinking=accumulated_thinking or None,
created_at=datetime.now(timezone.utc),
elapsed_seconds=_elapsed,
tool_call_count=_tool_call_count if _tool_call_count else None,
token_count=_net_tokens if _net_tokens else None,
)
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
session.context_token_count = context_tokens or 0
await self._sessions.save(session)
yield StreamEnd(
full_content=accumulated_text,
context_tokens=context_tokens,
max_context_tokens=settings.ollama_num_ctx,
elapsed_seconds=_elapsed,
tool_call_count=_tool_call_count,
token_count=_net_tokens if _net_tokens else None,
)
for event in await self._run_workers(session, llm, profile.model, context_tokens):
yield event
return
# Tool calls turn — record to session and execute
assistant_msg = Message(
role="assistant",
content=accumulated_text or None,
thinking=accumulated_thinking or None,
tool_calls=turn_tool_calls,
)
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
tool_map = {t.name: t for t in tools}
for tc in turn_tool_calls:
# 1. Announce immediately so the UI shows a pending card
yield ToolStarted(tool_name=tc.name, arguments=tc.arguments)
# 2. Create a sink queue for sub-agent events from this tool call.
# create_task() snapshots the current ContextVar values, so the
# task will inherit current_event_sink = sink.
sink: asyncio.Queue = asyncio.Queue()
sink_token = current_event_sink.set(sink)
result_holder: list = []
async def _run_with_sentinel(_tc=tc, _holder=result_holder, _sink=sink):
try:
_holder.append(await self._tool_executor._run_single_tool(_tc, tool_map))
except Exception as exc:
_holder.append(exc)
finally:
await _sink.put(_TOOL_DONE)
tool_task = asyncio.create_task(_run_with_sentinel())
current_event_sink.reset(sink_token) # outer ctx restored; task has its own copy
try:
# 3. Block on the sink until the sentinel arrives.
# Sub-agent ToolStarted/ToolEvent objects come through here in real time.
while True:
item = await sink.get()
if item is _TOOL_DONE:
break
if isinstance(item, SubagentComplete):
_subagent_tokens += item.token_count
_tool_call_count += item.tool_call_count
elif isinstance(item, AIHelperTokensUsed):
_subagent_tokens += item.total
else:
yield item
# 4. Unpack result or handle exception
r = result_holder[0] if result_holder else RuntimeError("tool task produced no result")
if isinstance(r, Exception):
# Infrastructure errors (LLMBackendError / LLMConnectionError) abort the turn;
# everything else becomes a failed tool result so the loop can continue.
if isinstance(r, (LLMBackendError, LLMConnectionError)):
raise r
log.warning("agent.tool_exception", tool=tc.name, error=str(r))
tool_event = ToolEvent(
tool_name=tc.name, arguments=tc.arguments,
result=f"Error: {r}", success=False,
)
msg = Message(role="tool", content=f"Error: {r}", tool_call_id=tc.id, name=tc.name, metadata={})
image_msg = None
else:
tool_event, msg, image_msg = r
# 5. Yield the completed ToolEvent and record in session
_tool_call_count += 1
yield tool_event
session.messages.append(msg)
session.context.append(msg)
if image_msg:
session.context.append(image_msg)
finally:
if not tool_task.done():
tool_task.cancel()
try:
await tool_task
except Exception:
pass
# 6. Cooperative stop: check after tool execution before next LLM call
if stop_event and stop_event.is_set():
await self._sessions.save(session)
yield StreamStopped()
return
# Update anti-stall counters after all tools in this iteration ran.
if profile.anti_stall_enabled:
# Todo progress signal
if _todo_status_snapshot(session_id) != _todo_snapshot_before:
_stall_no_todo = 0
else:
_stall_no_todo += 1
# Repeated tool call signal
cur_sigs = frozenset(
(tc.name, json.dumps(tc.arguments, sort_keys=True))
for tc in (turn_tool_calls or [])
)
if cur_sigs and cur_sigs == _prev_tool_sigs:
_stall_repeat_tools += 1
else:
_stall_repeat_tools = 0
_prev_tool_sigs = cur_sigs
# Adaptive re-plan: detect steps that were newly marked failed this iteration.
if profile.adaptive_replan_enabled:
current_failed = _todo_failed_steps(session_id)
new_failures = current_failed - _known_failed
_known_failed = current_failed
if new_failures:
failed_labels = ", ".join(
f'step {idx} ("{text}")'
for idx, text in sorted(new_failures)
)
_replan_msg = (
f"[Adaptive re-plan] {failed_labels} just failed. "
"Before continuing, revise your plan with the todo tool: either replace the remaining "
"pending steps or mark failed/skipped steps with validation. Then continue execution "
"with an approach that accounts for what went wrong."
)
log.info("agent.adaptive_replan_queued", failures=len(new_failures),
session_id=session_id)
# 7. If switch_profile was called this iteration, reload profile + tools.
# switch_profile updates the DB but run_stream() holds a local session
# object — without this check the final save would overwrite the change
# and the next LLM call would still use the old tool schemas.
fresh = await self._sessions.get(session_id)
if fresh and fresh.profile_id != session.profile_id:
session.profile_id = fresh.profile_id
profile = self._profiles.get(session.profile_id)
tools = self._tool_list(profile.enabled_tools, profile.mcp_servers)
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
log.info(
"agent.profile_reloaded",
session_id=session_id,
new_profile=session.profile_id,
)
await self._sessions.save(session)
raise MaxIterationsReached(profile.max_iterations)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
async def _run_workers(
self,
session,
llm: LLMBackend,
model: str,
context_tokens: int | None,
) -> list[AgentEvent]:
"""Run all workers sequentially; collect their events."""
from navi.workers.base import WorkerContext
ctx = WorkerContext(
session_id=session.id,
context_tokens=context_tokens,
max_context_tokens=settings.ollama_num_ctx,
llm=llm,
model=model,
temperature=settings.context_summary_temperature,
session_store=self._sessions,
)
events: list[AgentEvent] = []
for worker in self._workers:
try:
result = await worker.run(session, ctx)
events.extend(result.events)
except Exception:
log.warning("agent.worker_failed",
worker=type(worker).__name__, exc_info=True)
return events
async def _compress_session_context(
self,
session,
llm: LLMBackend,
model: str,
session_id: str,
reason: str,
keep_recent_messages: int | None = None,
) -> ContextCompressed | None:
"""Compress session.context and persist it, returning a UI event when it changed."""
try:
result = await compress_context(
context=session.context,
llm=llm,
model=model,
temperature=settings.context_summary_temperature,
keep_recent=settings.context_keep_recent,
max_tokens=settings.context_summary_max_tokens,
keep_recent_messages=keep_recent_messages,
)
except Exception:
log.warning(
"agent.context_compress_failed",
session_id=session_id,
reason=reason,
exc_info=True,
)
return None
if result is None:
return None
new_context, summary_text = result
count_before = len(session.context)
session.context = new_context
session.context_token_count = 0
session.messages.append(Message(
role="system",
is_compression=True,
content=summary_text,
))
await self._sessions.save(session)
log.info(
"agent.context_compress",
session_id=session_id,
reason=reason,
before=count_before,
after=len(new_context),
)
return ContextCompressed(
messages_before=count_before,
messages_after=len(new_context),
summary=summary_text,
context_tokens=session.context_token_count,
max_context_tokens=settings.ollama_num_ctx,
)
@staticmethod
def _estimate_context_tokens(context: list[Message]) -> int:
"""Conservative local estimate used before the next LLM call returns real token counts."""
chars = sum(len(m.content or "") for m in context)
imgs = sum(500 for m in context if m.images)
return chars // 4 + imgs
def _tool_list(
self,
enabled: list[str],
mcp_servers: dict[str, list[str]] | None = None,
) -> list[Tool]:
names = list(enabled)
extra = _load_user_enabled_tools()
for name in extra:
if name not in names:
names.append(name)
# Expand MCP server groups into concrete tool names
if mcp_servers and self._mcp_manager:
for server_name, groups in mcp_servers.items():
if "*" in groups:
# All registered tools for this server
prefix = f"mcp_{server_name}_"
for tool in self._tools.all():
if tool.name.startswith(prefix) and tool.name not in names:
names.append(tool.name)
else:
for group_name in groups:
for tool_name in self._mcp_manager.resolve_group(server_name, group_name):
full_name = f"mcp_{server_name}_{tool_name}"
if full_name not in names:
names.append(full_name)
result = []
for name in names:
try:
result.append(self._tools.get(name))
except Exception:
pass
return result
def _get_backend(self, backend_key: str) -> LLMBackend:
return self._backends.get(backend_key)
def _check_context_size(self, context: list[Message]) -> None:
"""Raise ContextTooLargeError before an LLM call if the context is dangerously large.
Uses a conservative character-based estimate (~4 chars per token for text).
Images are counted at 500 tokens each (rough vision-model estimate).
Checks against the *remaining* budget, not a fixed percentage of the window:
available_for_input = ollama_num_ctx - output_reserve
where output_reserve is a fixed token headroom reserved for the model's response.
This correctly accounts for sessions where conversation history already consumes
a large portion of the window.
"""
if not context:
return
output_reserve = settings.output_reserve_tokens
def _estimate(msgs: list[Message]) -> int:
chars = sum(len(m.content or "") for m in msgs)
imgs = sum(500 for m in msgs if m.images)
return chars // 4 + imgs
total = _estimate(context)
available = settings.ollama_num_ctx - output_reserve
if total > available:
existing = _estimate(context[:-1])
new = _estimate(context[-1:])
remaining = available - existing
raise ContextTooLargeError(
f"Context too large: new content is ~{new:,} estimated tokens, "
f"but only ~{max(0, remaining):,} tokens are available "
f"(window {settings.ollama_num_ctx:,}, already used ~{existing:,}, "
f"output_reserve {output_reserve:,}). "
"Split the file into smaller parts or delegate to a subagent."
)