"""
Agent: the tool-calling loop.
Flow:
1. Receive user message, load session + profile
2. Build tool schemas from profile's enabled_tools
3. Loop (up to max_iterations):
a. Call LLM with session.context (may be compressed) + tool schemas
b. If finish_reason == "stop" -> stream final response
c. If finish_reason == "tool_calls" -> execute tools, append to both
session.messages (display history) and session.context (LLM context)
4. After StreamEnd: run workers sequentially (e.g. context compression)
session.messages — full display history, never compressed
session.context — what the LLM sees; workers may compress this
"""
import asyncio
import base64
import io
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator
import structlog
from PIL import Image
from navi.config import settings
from navi.exceptions import ContextTooLargeError, LLMBackendError, LLMConnectionError, MaxIterationsReached, SessionNotFound
from navi.llm.base import LLMBackend, Message, ToolCallRequest
from navi.tools._internal.base import Tool, ToolContext, current_event_sink, current_stop_event, current_user_role, current_user_info
from .agent_run_context import AgentTurnContext, StreamState
from .anti_stall import AntiStallMonitor
from .compressor import ContextCompressor, should_compress
from .context_builder import ContextBuilder
from .planning import PlanningEngine
from .stream_guard import _iter_stream_guarded
from .subagent_runner import SubAgentRunner
from .tool_utils import build_tool_list, load_user_enabled_tools
from .events import (
AgentEvent,
AIHelperTokensUsed,
CompressionStarted,
ContextCompressed,
PlanningDebugData,
PlanningStatus,
PlanReady,
StreamEnd,
StreamStopped,
SubagentComplete,
TextDelta,
ThinkingDelta,
ThinkingEnd,
ToolEvent,
ToolStarted,
TurnThinking,
)
from .registry import BackendRegistry, ProfileRegistry, ToolRegistry
from .session import SessionStore
from .tool_executor import ToolExecutor
if TYPE_CHECKING:
from navi.context_providers._loader import ContextProviderRegistry
from navi.memory.store import MemoryStore
from navi.workers.base import Worker, WorkerContext
log = structlog.get_logger()
# Sentinel: placed in the event sink by the tool wrapper to signal completion.
_TOOL_DONE = object()
async def _todo_progress_message(session_id: str, *, first_iteration: bool = False) -> "Message | None":
"""Build a compact system reminder with current todo state and update discipline."""
from navi.tools.todo import get_progress_message
return await get_progress_message(session_id, first_iteration=first_iteration)
class Agent:
def __init__(
self,
session_store: "SessionStore | None",
profile_registry: ProfileRegistry,
tool_registry: ToolRegistry,
backend_registry: BackendRegistry,
workers: list["Worker"] | None = None,
memory_store: "MemoryStore | None" = None,
cp_registry: "ContextProviderRegistry | None" = None,
mcp_manager=None,
) -> None:
self._sessions = session_store
self._profiles = profile_registry
self._tools = tool_registry
self._backends = backend_registry
self._workers: list["Worker"] = workers or []
self._memory = memory_store
self._cp_registry = cp_registry
self._mcp_manager = mcp_manager
self._ctx_builder = ContextBuilder(
profile_registry=profile_registry,
memory_store=memory_store,
cp_registry=cp_registry,
mcp_manager=mcp_manager,
)
self._tool_executor = ToolExecutor(tool_registry)
self._planning = PlanningEngine(self._ctx_builder)
self._compressor = ContextCompressor()
self._subagent = SubAgentRunner(
profile_registry=profile_registry,
tool_registry=tool_registry,
backend_registry=backend_registry,
ctx_builder=self._ctx_builder,
compressor=self._compressor,
planning=self._planning,
tool_executor=self._tool_executor,
session_store=session_store,
mcp_manager=mcp_manager,
)
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
async def run(self, session_id: str, user_message: str, images: list[str] | None = None, files: list[dict] | None = None, is_recall: bool = False) -> str:
"""Non-streaming: run the full tool-calling loop and return the final text."""
full_content = ""
async for event in self.run_stream(session_id, user_message, images=images, files=files, is_recall=is_recall):
if isinstance(event, StreamEnd):
full_content = event.full_content or ""
return full_content
async def run_ephemeral(
self,
user_message: str,
profile_id: str,
max_iterations: int = 40,
exclude_tools: list[str] | None = None,
briefing: str | None = None,
custom_system_prompt: str | None = None,
inherit_system_prompt: bool = False,
context_transfer: str | None = None,
parent_session_id: str | None = None,
timeout_seconds: float = 300.0,
) -> tuple[str, bool]:
"""Delegate to SubAgentRunner."""
return await self._subagent.run(
user_message=user_message,
profile_id=profile_id,
max_iterations=max_iterations,
exclude_tools=exclude_tools,
briefing=briefing,
custom_system_prompt=custom_system_prompt,
inherit_system_prompt=inherit_system_prompt,
context_transfer=context_transfer,
parent_session_id=parent_session_id,
timeout_seconds=timeout_seconds,
)
async def run_stream(
self,
session_id: str,
user_message: str,
images: list[str] | None = None,
display_message: str | None = None,
files: list[dict] | None = None,
is_recall: bool = False,
) -> AsyncGenerator[AgentEvent, None]:
"""
Streaming variant. Yields AgentEvent objects:
- ThinkingDelta / ThinkingEnd: reasoning chunks
- ToolEvent: tool call + result
- TextDelta / StreamEnd: final streamed response
- ContextCompressed: emitted by workers after compression
Args:
user_message: The text sent to the LLM (may contain injected hints).
display_message: Optional text shown to the user in the chat UI.
When omitted, user_message is used for both LLM context and display.
"""
session = await self._sessions.get(session_id)
if session is None:
raise SessionNotFound(session_id)
profile = self._profiles.get(session.profile_id)
tools = self._tool_list(profile.get_agent_tools())
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
mem = await self._ctx_builder._memory_msg(user_id=session.user_id)
# Expose session_id and model to tools via ContextVar
from navi.tools._internal.base import current_session_id as _sid_var, current_model as _model_var
_sid_token = _sid_var.set(session_id)
_model_var.set(profile.model)
# Pre-turn compression: if the last turn filled the context past the
# threshold, compress NOW before calling the LLM. This prevents the
# model from seeing an over-full context and generating gibberish
# (e.g. a "summary of the conversation" instead of a real answer).
async for _ev in self._compression_events_preturn(session, llm, profile, session_id):
yield _ev
display_text = display_message if display_message is not None else user_message
user_msg_display = Message(role="user", content=display_text, images=images or None,
files=files or None, created_at=datetime.now(timezone.utc),
is_recall=is_recall, is_context=False)
# Image token budgeting: fit as many images as possible into the LLM context.
# Overflow images are saved to the session directory so Navi can view them
# later via image_view if needed.
images_for_context = images
context_content = user_message
if images:
current_tokens = self._compressor.estimate_context_tokens(session.context)
available_tokens = int(settings.ollama_num_ctx * 0.8) - current_tokens
max_images = max(0, available_tokens // 500)
if max_images < len(images):
images_for_context = images[:max_images]
overflow = images[max_images:]
saved_names = []
try:
session_dir = Path(settings.session_files_dir) / session_id
session_dir.mkdir(parents=True, exist_ok=True)
for idx, b64 in enumerate(overflow):
raw = base64.b64decode(b64)
img = Image.open(io.BytesIO(raw))
img = img.convert("RGB")
w, h = img.size
if w > 1024 or h > 1024:
ratio = 1024 / max(w, h)
img = img.resize((int(w * ratio), int(h * ratio)), Image.LANCZOS)
name = f"uploaded_{idx}.jpg"
path = session_dir / name
img.save(path, format="JPEG", quality=85, optimize=True)
saved_names.append(name)
except Exception:
pass
if saved_names:
context_content += (
f"\n\n[Additional images saved to session directory: {', '.join(saved_names)}]"
)
user_msg_context = Message(role="user", content=context_content, images=images_for_context or None,
files=files or None, created_at=datetime.now(timezone.utc),
is_recall=is_recall, is_display=False)
session.messages.append(user_msg_display)
session.messages.append(user_msg_context)
session.context.append(user_msg_context)
# Persist user message immediately so it survives a client disconnect
# before the assistant reply is ready.
await self._sessions.save(session)
stop_event = current_stop_event.get()
turn_ctx = AgentTurnContext(turn_start=time.monotonic())
# Planning phase — always runs on the first user message in a session;
# on subsequent messages uses the profile's planning_enabled flag.
# force_plan suppresses the DIRECT shortcut: first message is always forced,
# and planning_mandatory extends that to every subsequent message.
_is_first_message = sum(1 for m in session.messages if m.role == "user") == 1
_force_plan = _is_first_message or profile.planning_mandatory
if _is_first_message or profile.planning_enabled:
async for _ev in self._planning.run(session.context, profile, llm, mem, tool_schemas, messages=session.messages, force_plan=_force_plan):
if isinstance(_ev, AIHelperTokensUsed):
turn_ctx.subagent_tokens += _ev.completion_tokens
elif isinstance(_ev, PlanningDebugData):
session.planning_logs.append(_ev.log)
# Cap to prevent unbounded DB growth on long sessions
_MAX_PLANNING_LOGS = 20
if len(session.planning_logs) > _MAX_PLANNING_LOGS:
session.planning_logs = session.planning_logs[-_MAX_PLANNING_LOGS:]
else:
yield _ev
ctx_task = asyncio.create_task(self._ctx_builder._collect_context_injections(profile))
mem_facts_task = asyncio.create_task(
self._ctx_builder._memory_facts_msg(
user_message, user_id=session.user_id, injected_ids=turn_ctx.injected_fact_ids
)
)
ctx_injections = await ctx_task
mem_facts = await mem_facts_task
if mem_facts:
ctx_injections.append(mem_facts)
log.debug("agent.memory_facts_injected", session_id=session_id, facts_msg_length=len(mem_facts.content or ""))
else:
log.debug("agent.memory_facts_none", session_id=session_id)
anti_stall = AntiStallMonitor(profile)
if profile.anti_stall_enabled or profile.adaptive_replan_enabled:
await anti_stall.init(session_id)
# Tool-calling loop — uses stream_complete() for every turn so thinking
# is captured in real-time via ThinkingDelta/ThinkingEnd events.
for iteration in range(profile.max_iterations):
# Cooperative stop: check before each LLM call
if stop_event and stop_event.is_set():
await self._sessions.save(session)
yield StreamStopped()
return
async for _ev in self._compression_events_midturn(session, llm, profile, session_id, iteration, ctx_injections, mem):
yield _ev
state = StreamState()
built_ctx = self._ctx_builder.build(session.context, profile, mem,
iteration=iteration, max_iterations=profile.max_iterations,
extra_system=ctx_injections,
session_id=session_id)
if (
profile.goal_anchoring_enabled
and iteration > 0
and iteration % profile.goal_anchoring_interval == 0
):
built_ctx.append(await self._ctx_builder._build_goal_anchor(session_id, user_message))
todo_msg = await _todo_progress_message(session_id, first_iteration=(iteration == 0))
if todo_msg:
built_ctx.append(todo_msg)
# Anti-stall / adaptive re-plan: inject intervention message if needed
_stall_msg = await anti_stall.pre_turn(session_id, iteration)
if _stall_msg:
built_ctx.append(_stall_msg)
try:
self._compressor.check_context_size(built_ctx)
except ContextTooLargeError as e:
# Surface the error as a Navi response (not a raw system error) so the
# user sees a coherent message and the exchange is saved to history.
error_text = str(e)
assistant_msg = Message(role="assistant", content=error_text)
session.context.append(assistant_msg)
session.messages.append(assistant_msg)
await self._sessions.save(session)
yield TextDelta(delta=error_text)
yield StreamEnd(content=error_text)
return
async for _ev in self._consume_stream(
_iter_stream_guarded(
llm.stream_complete(
built_ctx,
tools=tool_schemas if tools else None,
temperature=profile.temperature,
model=profile.model,
think=profile.think_enabled,
top_k=profile.top_k,
top_p=profile.top_p,
num_thread=profile.num_thread,
),
stop_event=stop_event,
first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
chunk_timeout=settings.llm_stream_chunk_timeout,
),
stop_event,
turn_ctx,
state,
):
yield _ev
# Stopped mid-stream — save partial response and exit
if stop_event and stop_event.is_set():
if state.accumulated_text:
session.messages.append(Message(
role="assistant", content=state.accumulated_text,
created_at=datetime.now(timezone.utc),
is_context=False,
))
await self._sessions.save(session)
yield StreamStopped()
return
turn_tool_calls = state.turn_tool_calls
if not turn_tool_calls:
# Final response — text already streamed above
_elapsed = round(time.monotonic() - turn_ctx.turn_start, 1)
# Net tokens = accumulated across all iterations + subagent tokens
_net_tokens = turn_ctx.turn_tokens + turn_ctx.subagent_tokens
assistant_msg = Message(
role="assistant",
content=state.accumulated_text or None,
thinking=state.accumulated_thinking or None,
created_at=datetime.now(timezone.utc),
elapsed_seconds=_elapsed,
tool_call_count=turn_ctx.tool_call_count if turn_ctx.tool_call_count else None,
token_count=_net_tokens if _net_tokens else None,
)
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
session.context_token_count = state.context_tokens or 0
await self._sessions.save(session)
yield StreamEnd(
full_content=state.accumulated_text,
context_tokens=state.context_tokens,
max_context_tokens=settings.ollama_num_ctx,
elapsed_seconds=_elapsed,
tool_call_count=turn_ctx.tool_call_count,
token_count=_net_tokens if _net_tokens else None,
)
for event in await self._run_workers(session, llm, profile.model, state.context_tokens):
yield event
return
# Tool calls turn — record to session and execute
assistant_msg = Message(
role="assistant",
content=state.accumulated_text or None,
thinking=state.accumulated_thinking or None,
tool_calls=turn_tool_calls,
)
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
tool_ctx = ToolContext(
session_id=session_id,
event_sink=None, # set per-tool inside _execute_tools_with_sink
stop_event=stop_event,
model=profile.model,
user_id=session.user_id,
user_role=current_user_role.get(),
user_info=current_user_info.get(),
)
async for _ev in self._execute_tools_with_sink(turn_tool_calls, tools, turn_ctx, session, stop_event, tool_ctx):
yield _ev
# 6. Cooperative stop: check after tool execution before next LLM call
if stop_event and stop_event.is_set():
await self._sessions.save(session)
yield StreamStopped()
return
# Update anti-stall counters and adaptive-replan state
await anti_stall.post_turn(session_id, turn_tool_calls)
# 7. If switch_profile was called this iteration, reload profile + tools.
# switch_profile updates the DB but run_stream() holds a local session
# object — without this check the final save would overwrite the change
# and the next LLM call would still use the old tool schemas.
fresh = await self._sessions.get(session_id)
if fresh and fresh.profile_id != session.profile_id:
session.profile_id = fresh.profile_id
profile = self._profiles.get(session.profile_id)
tools = self._tool_list(profile.get_agent_tools())
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
log.info(
"agent.profile_reloaded",
session_id=session_id,
new_profile=session.profile_id,
)
await self._sessions.save(session)
raise MaxIterationsReached(profile.max_iterations)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
async def _run_workers(
self,
session,
llm: LLMBackend,
model: str,
context_tokens: int | None,
) -> list[AgentEvent]:
"""Run all workers sequentially; collect their events."""
from navi.workers.base import WorkerContext
ctx = WorkerContext(
session_id=session.id,
context_tokens=context_tokens,
max_context_tokens=settings.ollama_num_ctx,
llm=llm,
model=model,
temperature=settings.context_summary_temperature,
session_store=self._sessions,
)
events: list[AgentEvent] = []
for worker in self._workers:
try:
result = await worker.run(session, ctx)
events.extend(result.events)
except Exception:
log.warning("agent.worker_failed",
worker=type(worker).__name__, exc_info=True)
return events
async def _do_compress_and_save(
self,
session,
llm: LLMBackend,
model: str,
session_id: str,
reason: str,
keep_recent_messages: int | None = None,
) -> ContextCompressed | None:
"""Compress session.context and persist it, returning a UI event when it changed."""
count_before = len(session.context)
result = await self._compressor.compress_session(
context=session.context,
llm=llm,
model=model,
temperature=settings.context_summary_temperature,
keep_recent=settings.context_keep_recent,
max_tokens=settings.context_summary_max_tokens,
keep_recent_messages=keep_recent_messages,
)
if result is None:
return None
new_context, summary_text = result
# Mark messages that are no longer part of the LLM context
new_context_ids = {id(m) for m in new_context}
for msg in session.messages:
if id(msg) not in new_context_ids and msg.role != "system":
msg.is_context = False
# The summary returned by the compressor must also live in messages so
# save() writes it to the normalized table, but it is not displayed.
summary_msg = next((m for m in new_context if m.is_summary), None)
if summary_msg and summary_msg not in session.messages:
summary_msg.is_display = False
session.messages.append(summary_msg)
# UI marker showing that compression happened
session.messages.append(Message(
role="system",
is_compression=True,
is_context=False,
content=summary_text,
))
session.context = new_context
session.context_token_count = self._compressor.estimate_context_tokens(new_context)
await self._sessions.save(session)
# Archive old messages if the hot table exceeds the configured window.
if settings.session_messages_window > 0 and session.db_next_sequence > settings.session_messages_window:
threshold = session.db_next_sequence - settings.session_messages_window
archived = await self._sessions.archive_old_messages(session_id, threshold)
if archived > 0:
log.info(
"agent.archive_messages",
session_id=session_id,
archived=archived,
threshold=threshold,
)
log.info(
"agent.context_compress",
session_id=session_id,
reason=reason,
before=count_before,
after=len(new_context),
)
return ContextCompressed(
messages_before=count_before,
messages_after=len(new_context),
summary=summary_text,
context_tokens=session.context_token_count,
max_context_tokens=settings.ollama_num_ctx,
)
def _tool_list(
self,
scope: "ToolScopeConfig",
) -> list[Tool]:
return build_tool_list(scope.native, scope.mcp, self._tools, self._mcp_manager)
def _get_backend(self, backend_key: str) -> LLMBackend:
return self._backends.get(backend_key)
async def _compression_events_preturn(self, session, llm, profile, session_id):
if (
settings.context_compression_enabled
and len(session.context) > 2
and should_compress(
self._compressor.estimate_context_tokens(session.context),
settings.ollama_num_ctx,
settings.context_compression_threshold,
)
):
yield CompressionStarted(
context_tokens=self._compressor.estimate_context_tokens(session.context),
max_context_tokens=settings.ollama_num_ctx,
)
event = await self._do_compress_and_save(
session=session,
llm=llm,
model=profile.model,
session_id=session_id,
reason="preturn",
)
if event:
yield event
async def _compression_events_midturn(self, session, llm, profile, session_id, iteration, ctx_injections, mem):
if settings.context_compression_enabled and iteration > 0:
preflight_ctx = self._ctx_builder.build(
session.context,
profile,
mem,
extra_system=ctx_injections,
session_id=session_id,
)
estimated_tokens = self._compressor.estimate_context_tokens(preflight_ctx)
if should_compress(
estimated_tokens,
settings.ollama_num_ctx,
settings.context_compression_threshold,
):
yield CompressionStarted(
context_tokens=estimated_tokens,
max_context_tokens=settings.ollama_num_ctx,
)
event = await self._do_compress_and_save(
session=session,
llm=llm,
model=profile.model,
session_id=session_id,
reason="midturn",
keep_recent_messages=max(12, settings.context_keep_recent * 2),
)
if event:
yield event
async def _consume_stream(self, stream_gen, stop_event, turn_ctx: AgentTurnContext, state: StreamState):
async for chunk in stream_gen:
if stop_event and stop_event.is_set():
if state.thinking_active:
yield ThinkingEnd()
break
if chunk.prompt_tokens is not None:
state.context_tokens = chunk.prompt_tokens
if chunk.completion_tokens is not None:
turn_ctx.turn_tokens += chunk.completion_tokens
if chunk.thinking:
state.accumulated_thinking += chunk.thinking
if not state.thinking_active:
state.thinking_active = True
yield ThinkingDelta(delta=chunk.thinking)
elif chunk.delta:
if state.thinking_active:
state.thinking_active = False
yield ThinkingEnd()
state.accumulated_text += chunk.delta
yield TextDelta(delta=chunk.delta)
if chunk.tool_calls:
state.turn_tool_calls = chunk.tool_calls
if chunk.finish_reason and state.thinking_active:
state.thinking_active = False
yield ThinkingEnd()
async def _execute_tools_with_sink(self, turn_tool_calls, tools, turn_ctx: AgentTurnContext, session, stop_event, tool_ctx=None):
"""Execute tool calls with cooperative stop support.
Polls *stop_event* every second while draining the event sink so the
Stop button works even during long-running tools (terminal, SSH,
web search, sub-agent spawn).
"""
tool_map = {t.name: t for t in tools}
for tc in turn_tool_calls:
yield ToolStarted(tool_name=tc.name, arguments=tc.arguments, tool_call_id=tc.id)
sink: asyncio.Queue = asyncio.Queue()
sink_token = current_event_sink.set(sink)
result_holder: list = []
async def _run_with_sentinel(_tc=tc, _holder=result_holder, _sink=sink):
try:
_holder.append(await self._tool_executor._run_single_tool(_tc, tool_map, ctx=tool_ctx))
except Exception as exc:
_holder.append(exc)
finally:
await _sink.put(_TOOL_DONE)
tool_task = asyncio.create_task(_run_with_sentinel())
current_event_sink.reset(sink_token)
try:
# Poll sink with 1s timeout so stop_event is checked during long tools
stopped = False
while True:
try:
item = await asyncio.wait_for(sink.get(), timeout=1.0)
except asyncio.TimeoutError:
if stop_event and stop_event.is_set():
stopped = True
if not tool_task.done():
tool_task.cancel()
break
continue
if item is _TOOL_DONE:
break
if isinstance(item, SubagentComplete):
turn_ctx.subagent_tokens += item.token_count
turn_ctx.tool_call_count += item.tool_call_count
elif isinstance(item, AIHelperTokensUsed):
turn_ctx.subagent_tokens += item.completion_tokens
else:
yield item
if stopped:
# Tool was interrupted by user — record a synthetic cancellation result
log.info("agent.tool_stopped", tool=tc.name)
yield ToolEvent(
tool_name=tc.name, arguments=tc.arguments,
result="Tool execution was stopped by the user.", success=False,
tool_call_id=tc.id,
)
session.messages.append(Message(
role="tool", content="Tool execution was stopped by the user.",
tool_call_id=tc.id, name=tc.name, metadata={},
is_context=False,
))
await self._sessions.save(session)
yield StreamStopped()
return
r = result_holder[0] if result_holder else RuntimeError("tool task produced no result")
if isinstance(r, Exception):
if isinstance(r, (LLMBackendError, LLMConnectionError)):
raise r
log.warning("agent.tool_exception", tool=tc.name, error=str(r))
tool_event = ToolEvent(
tool_name=tc.name, arguments=tc.arguments,
result=f"Error: {r}", success=False,
tool_call_id=tc.id,
)
msg = Message(role="tool", content=f"Error: {r}", tool_call_id=tc.id, name=tc.name, metadata={})
image_msg = None
else:
tool_event, msg, image_msg = r
turn_ctx.tool_call_count += 1
yield tool_event
session.messages.append(msg)
session.context.append(msg)
if image_msg:
session.context.append(image_msg)
finally:
if not tool_task.done():
tool_task.cancel()
try:
await tool_task
except Exception:
pass