"""
Agent: the tool-calling loop.
Flow:
1. Receive user message, load session + profile
2. Build tool schemas from profile's enabled_tools
3. Loop (up to max_iterations):
a. Call LLM with session.context (may be compressed) + tool schemas
b. If finish_reason == "stop" -> stream final response
c. If finish_reason == "tool_calls" -> execute tools, append to both
session.messages (display history) and session.context (LLM context)
4. After StreamEnd: run workers sequentially (e.g. context compression)
session.messages — full display history, never compressed
session.context — what the LLM sees; workers may compress this
"""
import asyncio
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator
import structlog
from navi.config import settings
from navi.exceptions import ContextTooLargeError, LLMBackendError, MaxIterationsReached, SessionNotFound
from navi.llm.base import LLMBackend, LLMChunk, Message, ToolCallRequest
from navi.tools.base import Tool, current_event_sink, current_stop_event
from .compressor import compress_context, should_compress
from .events import (
AgentEvent,
AIHelperTokensUsed,
ContextCompressed,
PlanningStatus,
PlanReady,
StreamEnd,
StreamStopped,
SubagentComplete,
TextDelta,
ThinkingDelta,
ThinkingEnd,
ToolEvent,
ToolStarted,
TurnThinking,
)
from .registry import BackendRegistry, ProfileRegistry, ToolRegistry
from .session import SessionStore
if TYPE_CHECKING:
from navi.memory.store import MemoryStore
from navi.workers.base import Worker, WorkerContext
_USER_ENABLED_FILE = Path(settings.tools_dir) / "enabled.json"
async def _iter_stream_guarded(
stream_gen: "AsyncGenerator[LLMChunk, None]",
stop_event: "asyncio.Event | None",
first_chunk_timeout: float,
chunk_timeout: float,
) -> "AsyncGenerator[LLMChunk, None]":
"""
Wraps a streaming LLM generator with two safety mechanisms:
1. Stop-event responsiveness during prefill.
Normally, the agent only checks stop_event *between* chunks. During the
prefill phase (processing input tokens) Ollama emits no chunks at all —
the first await can block for minutes on large contexts. This wrapper polls
stop_event every second so the user's Stop button works even then.
2. Timeouts as a last-resort safety net.
first_chunk_timeout: how long to wait for the first token (prefill).
chunk_timeout: max gap between subsequent tokens.
On timeout the generator is closed, which terminates the HTTP connection
to Ollama → Ollama halts generation → GPU load drops to idle.
"""
first = True
chunk_task: asyncio.Task | None = None
try:
while True:
timeout = first_chunk_timeout if first else chunk_timeout
# Create one task per chunk; reuse across poll iterations so we
# don't accidentally start multiple concurrent __anext__ calls.
chunk_task = asyncio.ensure_future(stream_gen.__anext__())
elapsed = 0.0
while True:
done, _ = await asyncio.wait({chunk_task}, timeout=1.0)
if done:
break
elapsed += 1.0
if stop_event and stop_event.is_set():
chunk_task.cancel()
try:
await chunk_task
except (asyncio.CancelledError, Exception):
pass
chunk_task = None
return
if elapsed >= timeout:
chunk_task.cancel()
try:
await chunk_task
except (asyncio.CancelledError, Exception):
pass
chunk_task = None
label = "first token (context may be too large for this model)" if first else "next token"
raise LLMBackendError(
f"LLM timed out after {elapsed:.0f}s waiting for {label}."
)
try:
chunk = chunk_task.result()
except StopAsyncIteration:
chunk_task = None
return
chunk_task = None
first = False
yield chunk
if stop_event and stop_event.is_set():
return
finally:
# Cancel any in-flight __anext__ task so we don't leave a zombie
# coroutine holding an open HTTP connection to Ollama.
if chunk_task is not None and not chunk_task.done():
chunk_task.cancel()
try:
await chunk_task
except (asyncio.CancelledError, Exception):
pass
# Closing the generator terminates the HTTP connection to Ollama,
# which signals it to stop generating (GPU returns to idle).
try:
await stream_gen.aclose()
except Exception:
pass
def _load_user_enabled_tools() -> list[str]:
try:
return json.loads(_USER_ENABLED_FILE.read_text())
except Exception:
return []
log = structlog.get_logger()
# Sentinel: placed in the event sink by the tool wrapper to signal completion.
_TOOL_DONE = object()
class Agent:
def __init__(
self,
session_store: "SessionStore | None",
profile_registry: ProfileRegistry,
tool_registry: ToolRegistry,
backend_registry: BackendRegistry,
workers: list["Worker"] | None = None,
memory_store: "MemoryStore | None" = None,
) -> None:
self._sessions = session_store
self._profiles = profile_registry
self._tools = tool_registry
self._backends = backend_registry
self._workers: list["Worker"] = workers or []
self._memory = memory_store
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
async def run(self, session_id: str, user_message: str, images: list[str] | None = None) -> str:
"""Non-streaming: run the full tool-calling loop and return the final text."""
session = await self._sessions.get(session_id)
if session is None:
raise SessionNotFound(session_id)
profile = self._profiles.get(session.profile_id)
tools = self._tool_list(profile.enabled_tools)
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
mem = await self._memory_msg()
# Expose session_id to tools (e.g. SSH connection pool) via ContextVar
from navi.tools.base import current_session_id as _sid_var
_sid_var.set(session_id)
user_msg = Message(role="user", content=user_message, images=images or None,
created_at=datetime.now(timezone.utc))
session.messages.append(user_msg)
session.context.append(user_msg)
await self._sessions.save(session)
for iteration in range(profile.max_iterations):
log.debug("agent.iteration", session_id=session_id, iteration=iteration)
response = await llm.complete(
self._build_context(session.context, profile, mem),
tools=tool_schemas if tools else None,
temperature=profile.temperature,
model=profile.model,
)
if response.finish_reason == "stop" or not response.tool_calls:
content = response.content or ""
assistant_msg = Message(role="assistant", content=content,
created_at=datetime.now(timezone.utc))
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
await self._sessions.save(session)
return content
# Tool calls turn — append to both messages and context
assistant_msg = Message(
role="assistant",
content=response.content,
tool_calls=response.tool_calls,
)
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
tool_results, image_injections = await self._execute_tool_calls(response.tool_calls, tools)
session.messages.extend(tool_results)
session.context.extend(tool_results)
# Image injections are synthetic LLM helpers — context only
session.context.extend(image_injections)
await self._sessions.save(session)
raise MaxIterationsReached(profile.max_iterations)
async def run_ephemeral(
self,
user_message: str,
profile_id: str,
max_iterations: int = 20,
exclude_tools: list[str] | None = None,
custom_system_prompt: str | None = None,
context_transfer: str | None = None,
timeout_seconds: float = 300.0,
) -> tuple[str, bool]:
"""
Run a sub-agent loop without a persistent session.
Returns (result_text, completed_normally).
completed_normally is False if the sub-agent hit the iteration limit or timed out.
Intended for spawning from tools (e.g. SpawnAgentTool).
No DB reads/writes — uses a temporary in-memory context.
Tools listed in exclude_tools are stripped from the tool list
(use this to prevent recursion: exclude 'spawn_agent').
custom_system_prompt: injected after the profile's system prompt (overrides
profile.subagent_system_prompt if provided).
context_transfer: text passed from the parent's scratchpad context_transfer
section, injected as a priming exchange before the task message.
timeout_seconds: wall-clock timeout for the entire sub-agent run.
"""
import time as _time
import uuid as _uuid
# Give each sub-agent its own scratchpad namespace so parallel or
# sequential sub-agents don't clobber each other's working notes.
from navi.tools.base import current_session_id as _sid_var, current_model as _model_var
_sid_var.set(f"subagent_{_uuid.uuid4().hex[:12]}")
profile = self._profiles.get(profile_id)
_model_var.set(profile.model)
exclude = set(exclude_tools or [])
# Use dedicated subagent_tools if configured, else fall back to enabled_tools.
tool_source = profile.subagent_tools if profile.subagent_tools else profile.enabled_tools
tools = [t for t in self._tool_list(tool_source) if t.name not in exclude]
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
mem = await self._memory_msg()
# Build subagent system prompt: profile.system_prompt + subagent-specific prompt.
# No persona, no profiles block — subagents are focused executors.
effective_subagent_prompt = custom_system_prompt or profile.subagent_system_prompt
subagent_sys_content = profile.system_prompt
if effective_subagent_prompt:
subagent_sys_content = subagent_sys_content + "\n\n---\n\n" + effective_subagent_prompt
subagent_sys_msg = Message(role="system", content=subagent_sys_content)
# Build initial context.
# If context_transfer is provided, inject it as a priming exchange so the
# sub-agent has the parent's working state from the start.
context: list[Message] = []
if context_transfer:
context.append(Message(
role="user",
content=f"## Context from parent agent\n\n{context_transfer}",
))
context.append(Message(
role="assistant",
content="Understood. I have the context. Ready to begin the task.",
))
context.append(Message(role="user", content=user_message, created_at=datetime.now(timezone.utc)))
# Read the event sink set by the parent run_stream() for this tool call.
# If None (e.g. called from run(), not run_stream()), events are silently dropped.
sink = current_event_sink.get()
log.info("agent.subagent.start", profile_id=profile_id, max_iterations=max_iterations,
tools=len(tools), planning=profile.subagent_planning_enabled)
stop_event = current_stop_event.get()
tool_map = {t.name: t for t in tools}
_sub_tokens: int = 0 # tokens from the final LLM call
_sub_tool_count: int = 0 # total tool calls across all iterations
_start_time = _time.monotonic()
accumulated_text = ""
# ── Optional planning phase ────────────────────────────────────────────
if profile.subagent_planning_enabled:
async for _ev in self._run_planning(context, profile, llm, mem, tool_schemas):
if isinstance(_ev, AIHelperTokensUsed):
pass # token accounting only, not forwarded
elif sink is not None:
await sink.put(_ev)
# ── Tool-calling loop ──────────────────────────────────────────────────
for iteration in range(max_iterations):
if stop_event and stop_event.is_set():
return accumulated_text, False
elapsed = _time.monotonic() - _start_time
if elapsed >= timeout_seconds:
log.warning("agent.subagent.timeout", elapsed=elapsed, timeout=timeout_seconds)
if sink is not None:
await sink.put(SubagentComplete(token_count=_sub_tokens, tool_call_count=_sub_tool_count))
return accumulated_text or "[Sub-agent timed out]", False
log.debug("agent.subagent.iteration", iteration=iteration)
accumulated_text = ""
accumulated_thinking = ""
turn_tool_calls: list[ToolCallRequest] | None = None
turn_tokens: int | None = None
# Build context inline — no persona or profiles block for subagents.
built_ctx: list[Message] = [subagent_sys_msg]
if mem:
built_ctx.append(mem)
built_ctx.extend(m for m in context if m.role != "system")
self._check_context_size(built_ctx)
async for chunk in _iter_stream_guarded(
llm.stream_complete(
built_ctx,
tools=tool_schemas if tools else None,
temperature=profile.temperature,
model=profile.model,
),
stop_event=stop_event,
first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
chunk_timeout=settings.llm_stream_chunk_timeout,
):
if chunk.prompt_tokens is not None or chunk.completion_tokens is not None:
turn_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0)
if chunk.thinking:
accumulated_thinking += chunk.thinking
if chunk.delta:
accumulated_text += chunk.delta
if chunk.tool_calls:
turn_tool_calls = chunk.tool_calls
if stop_event and stop_event.is_set():
return accumulated_text, False
if not turn_tool_calls:
log.info("agent.subagent.complete", iterations=iteration + 1,
result_len=len(accumulated_text))
_sub_tokens = turn_tokens or 0
if sink is not None:
await sink.put(SubagentComplete(
token_count=_sub_tokens,
tool_call_count=_sub_tool_count,
))
return accumulated_text, True
# Emit accumulated thinking before tool calls
if accumulated_thinking and sink is not None:
log.debug("agent.subagent.turn_thinking", length=len(accumulated_thinking))
await sink.put(TurnThinking(thinking=accumulated_thinking, is_subagent=True))
context.append(Message(
role="assistant",
content=accumulated_text or None,
tool_calls=turn_tool_calls,
))
# Execute each tool call sequentially, emitting events to parent sink
for tc in turn_tool_calls:
_sub_tool_count += 1
if sink is not None:
await sink.put(ToolStarted(
tool_name=tc.name, arguments=tc.arguments, is_subagent=True
))
tool = tool_map.get(tc.name)
image_msg = None
if tool is None:
content = f"Error: tool '{tc.name}' not found."
success = False
else:
log.info("tool.execute.subagent", tool=tc.name, args=tc.arguments)
result = await tool.execute(tc.arguments)
content = result.to_message_content()
success = result.success
if result.success and result.metadata and result.metadata.get("is_image"):
b64 = result.metadata.get("base64")
if b64:
image_msg = Message(
role="user",
content=f"[Image loaded via {tc.name} — analyse it]",
images=[b64],
)
if sink is not None:
await sink.put(ToolEvent(
tool_name=tc.name, arguments=tc.arguments,
result=content, success=success, is_subagent=True,
))
context.append(Message(role="tool", content=content,
tool_call_id=tc.id, name=tc.name))
if image_msg:
context.append(image_msg)
log.warning("agent.subagent.max_iterations", max_iterations=max_iterations)
if sink is not None:
await sink.put(SubagentComplete(token_count=_sub_tokens, tool_call_count=_sub_tool_count))
return accumulated_text or "[Sub-agent reached iteration limit without a final answer]", False
async def run_stream(
self, session_id: str, user_message: str, images: list[str] | None = None
) -> AsyncGenerator[AgentEvent, None]:
"""
Streaming variant. Yields AgentEvent objects:
- ThinkingDelta / ThinkingEnd: reasoning chunks
- ToolEvent: tool call + result
- TextDelta / StreamEnd: final streamed response
- ContextCompressed: emitted by workers after compression
"""
session = await self._sessions.get(session_id)
if session is None:
raise SessionNotFound(session_id)
profile = self._profiles.get(session.profile_id)
tools = self._tool_list(profile.enabled_tools)
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
mem = await self._memory_msg()
# Expose session_id and model to tools via ContextVar
from navi.tools.base import current_session_id as _sid_var, current_model as _model_var
_sid_token = _sid_var.set(session_id)
_model_var.set(profile.model)
# Pre-turn compression: if the last turn filled the context past the
# threshold, compress NOW before calling the LLM. This prevents the
# model from seeing an over-full context and generating gibberish
# (e.g. a "summary of the conversation" instead of a real answer).
if (
settings.context_compression_enabled
and session.context_token_count > 0
and should_compress(
session.context_token_count,
settings.ollama_num_ctx,
settings.context_compression_threshold,
)
):
try:
new_context = await compress_context(
context=session.context,
llm=llm,
model=profile.model,
temperature=settings.context_summary_temperature,
keep_recent=settings.context_keep_recent,
)
if new_context is not None:
count_before = len(session.context)
session.context = new_context
session.context_token_count = 0
log.info(
"agent.preturn_compress",
session_id=session_id,
before=count_before,
after=len(new_context),
)
except Exception:
log.warning("agent.preturn_compress_failed", session_id=session_id, exc_info=True)
user_msg = Message(role="user", content=user_message, images=images or None,
created_at=datetime.now(timezone.utc))
session.messages.append(user_msg)
session.context.append(user_msg)
# Persist user message immediately so it survives a client disconnect
# before the assistant reply is ready.
await self._sessions.save(session)
stop_event = current_stop_event.get()
_turn_start = time.monotonic()
_tool_call_count = 0
_subagent_tokens = 0
_prev_tokens = session.context_token_count # token baseline before this turn
# Planning phase — runs a fast, non-streaming LLM call to produce a
# step-by-step plan BEFORE the tool-calling loop starts. The plan is
# injected into session.context as an assistant message so the model
# naturally continues from it, and emitted as PlanReady for the UI.
if profile.planning_enabled:
async for _ev in self._run_planning(session.context, profile, llm, mem, tool_schemas, messages=session.messages):
if isinstance(_ev, AIHelperTokensUsed):
# Accumulate planning token usage into the turn total (not forwarded to WS)
_subagent_tokens += _ev.total
else:
yield _ev
# Tool-calling loop — uses stream_complete() for every turn so thinking
# is captured in real-time via ThinkingDelta/ThinkingEnd events.
for iteration in range(profile.max_iterations):
# Cooperative stop: check before each LLM call
if stop_event and stop_event.is_set():
await self._sessions.save(session)
yield StreamStopped()
return
accumulated_text = ""
accumulated_thinking = ""
turn_tool_calls: list[ToolCallRequest] | None = None
thinking_active = False
context_tokens: int | None = None
built_ctx = self._build_context(session.context, profile, mem)
try:
self._check_context_size(built_ctx)
except ContextTooLargeError as e:
# Surface the error as a Navi response (not a raw system error) so the
# user sees a coherent message and the exchange is saved to history.
error_text = str(e)
assistant_msg = Message(role="assistant", content=error_text)
session.context.append(assistant_msg)
session.messages.append(assistant_msg)
await self._sessions.save(session)
yield TextDelta(delta=error_text)
yield StreamEnd(content=error_text)
return
async for chunk in _iter_stream_guarded(
llm.stream_complete(
built_ctx,
tools=tool_schemas if tools else None,
temperature=profile.temperature,
model=profile.model,
),
stop_event=stop_event,
first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
chunk_timeout=settings.llm_stream_chunk_timeout,
):
# Cooperative stop: break cleanly so aclose() is called on the
# generator → Ollama stream closes gracefully, model stays in VRAM.
if stop_event and stop_event.is_set():
if thinking_active:
yield ThinkingEnd()
break
if chunk.prompt_tokens is not None or chunk.completion_tokens is not None:
context_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0)
if chunk.thinking:
accumulated_thinking += chunk.thinking
if not thinking_active:
thinking_active = True
yield ThinkingDelta(delta=chunk.thinking)
elif chunk.delta:
if thinking_active:
thinking_active = False
yield ThinkingEnd()
accumulated_text += chunk.delta
yield TextDelta(delta=chunk.delta)
if chunk.tool_calls:
turn_tool_calls = chunk.tool_calls
if chunk.finish_reason and thinking_active:
thinking_active = False
yield ThinkingEnd()
# Stopped mid-stream — save partial response and exit
if stop_event and stop_event.is_set():
if accumulated_text:
session.messages.append(Message(
role="assistant", content=accumulated_text,
created_at=datetime.now(timezone.utc),
))
await self._sessions.save(session)
yield StreamStopped()
return
if not turn_tool_calls:
# Final response — text already streamed above
_elapsed = round(time.monotonic() - _turn_start, 1)
# Net tokens = marginal cost of this turn (delta from baseline) + subagent tokens
_net_tokens = max(0, (context_tokens or 0) - _prev_tokens) + _subagent_tokens
assistant_msg = Message(
role="assistant",
content=accumulated_text or None,
thinking=accumulated_thinking or None,
created_at=datetime.now(timezone.utc),
elapsed_seconds=_elapsed,
tool_call_count=_tool_call_count if _tool_call_count else None,
token_count=_net_tokens if _net_tokens else None,
)
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
session.context_token_count = context_tokens or 0
await self._sessions.save(session)
yield StreamEnd(
full_content=accumulated_text,
context_tokens=context_tokens,
max_context_tokens=settings.ollama_num_ctx,
elapsed_seconds=_elapsed,
tool_call_count=_tool_call_count,
token_count=_net_tokens if _net_tokens else None,
)
for event in await self._run_workers(session, llm, profile.model, context_tokens):
yield event
return
# Tool calls turn — record to session and execute
assistant_msg = Message(
role="assistant",
content=accumulated_text or None,
thinking=accumulated_thinking or None,
tool_calls=turn_tool_calls,
)
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
tool_map = {t.name: t for t in tools}
for tc in turn_tool_calls:
# 1. Announce immediately so the UI shows a pending card
yield ToolStarted(tool_name=tc.name, arguments=tc.arguments)
# 2. Create a sink queue for sub-agent events from this tool call.
# create_task() snapshots the current ContextVar values, so the
# task will inherit current_event_sink = sink.
sink: asyncio.Queue = asyncio.Queue()
sink_token = current_event_sink.set(sink)
result_holder: list = []
async def _run_with_sentinel(_tc=tc, _holder=result_holder, _sink=sink):
try:
_holder.append(await self._run_single_tool(_tc, tool_map))
except BaseException as exc:
_holder.append(exc)
finally:
await _sink.put(_TOOL_DONE)
tool_task = asyncio.create_task(_run_with_sentinel())
current_event_sink.reset(sink_token) # outer ctx restored; task has its own copy
# 3. Block on the sink until the sentinel arrives.
# Sub-agent ToolStarted/ToolEvent objects come through here in real time.
while True:
item = await sink.get()
if item is _TOOL_DONE:
break
if isinstance(item, SubagentComplete):
_subagent_tokens += item.token_count
_tool_call_count += item.tool_call_count
elif isinstance(item, AIHelperTokensUsed):
_subagent_tokens += item.total
else:
yield item
# 4. Re-raise tool exception or unpack result
r = result_holder[0] if result_holder else RuntimeError("tool task produced no result")
if isinstance(r, BaseException):
raise r
tool_event, msg, image_msg = r
# 5. Yield the completed ToolEvent and record in session
_tool_call_count += 1
yield tool_event
session.messages.append(msg)
session.context.append(msg)
if image_msg:
session.context.append(image_msg)
# 6. Cooperative stop: check after tool execution before next LLM call
if stop_event and stop_event.is_set():
await self._sessions.save(session)
yield StreamStopped()
return
# 7. If switch_profile was called this iteration, reload profile + tools.
# switch_profile updates the DB but run_stream() holds a local session
# object — without this check the final save would overwrite the change
# and the next LLM call would still use the old tool schemas.
fresh = await self._sessions.get(session_id)
if fresh and fresh.profile_id != session.profile_id:
session.profile_id = fresh.profile_id
profile = self._profiles.get(session.profile_id)
tools = self._tool_list(profile.enabled_tools)
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
log.info(
"agent.profile_reloaded",
session_id=session_id,
new_profile=session.profile_id,
)
await self._sessions.save(session)
raise MaxIterationsReached(profile.max_iterations)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
async def _run_planning(
self,
context: "list[Message]",
profile,
llm: LLMBackend,
mem: "Message | None",
tool_schemas: list | None = None,
messages: "list[Message] | None" = None,
):
"""
Three-phase planning (async generator):
Phase 1 — Analysis (think=True): reformulate the task, identify subtasks and
unknowns. Returns DIRECT for simple requests to skip planning entirely.
Phase 2 — Execution plan: assign each subtask to a specific executor
(TOOL / AGENT / SELF) with awareness of actually available tools.
Phase 3 — AIHelper critic: independent pass that validates executor assignments
against the real tool list and fixes any mismatches.
Yields PlanningStatus before each phase so the UI can show progress,
then yields PlanReady when the final plan is ready.
Yields nothing if planning is skipped.
"""
import re as _re
# ── Build compact tool list for Phase 2 / Phase 3 ─────────────────────
if tool_schemas:
tool_lines = []
for schema in tool_schemas:
fn = schema.function if hasattr(schema, "function") else schema.get("function", {})
name = fn.get("name", "")
desc = (fn.get("description") or "").split("\n")[0][:80]
tool_lines.append(f" - {name}: {desc}")
available_tools_block = (
"Available tools (use these exact names for TOOL: assignments):\n"
+ "\n".join(tool_lines)
+ "\n\n"
)
tool_names_list = ", ".join(
(schema.function if hasattr(schema, "function") else schema.get("function", {})).get("name", "")
for schema in tool_schemas
)
else:
available_tools_block = ""
tool_names_list = ""
# Read stop event once — checked between all phases
_stop = current_stop_event.get()
# ── Phase 1: Task analysis (with reasoning) ────────────────────────────
yield PlanningStatus(phase=1, label="Analysing task...")
phase1_system = Message(
role="system",
content=(
self._build_system_prompt(profile)
+ "\n\n---\n\n"
"[PLANNING — PHASE 1: ANALYSIS]\n\n"
"Read the user's latest request.\n\n"
"If it is a simple question, casual conversation, or answerable in one step "
"without tools — respond with exactly: DIRECT\n\n"
"Otherwise analyse the request and output:\n\n"
"TASK: [one clear sentence — what actually needs to be done]\n"
"GOAL: [how you will know the task is complete]\n"
"UNKNOWNS: [genuine uncertainties that could block execution, or NONE]\n"
"SUBTASKS:\n"
"1. [discrete unit of work]\n"
"2. [discrete unit of work]\n\n"
"Rules: maximum 6 subtasks. Each must be concrete and actionable. "
"No execution yet — analysis only."
),
)
phase1_ctx: list[Message] = [phase1_system]
if mem:
phase1_ctx.append(mem)
phase1_ctx.extend(m for m in context if m.role != "system")
try:
r1 = await asyncio.wait_for(
llm.complete(phase1_ctx, tools=None, temperature=0.3, model=profile.model, think=True),
timeout=settings.llm_complete_timeout,
)
analysis = (r1.content or "").strip()
except asyncio.TimeoutError:
log.warning("agent.planning_phase1_timeout", timeout=settings.llm_complete_timeout)
return
except Exception:
log.warning("agent.planning_phase1_failed", exc_info=True)
return
if r1.prompt_tokens or r1.completion_tokens:
yield AIHelperTokensUsed(
prompt_tokens=r1.prompt_tokens or 0,
completion_tokens=r1.completion_tokens or 0,
)
if not analysis or analysis.upper().startswith("DIRECT"):
log.debug("agent.planning_skipped", reason="direct")
return
if _stop and _stop.is_set():
log.debug("agent.planning_stopped", phase=1)
return
# ── Phase 2: Execution plan ────────────────────────────────────────────
yield PlanningStatus(phase=2, label="Building execution plan...")
phase2_system = Message(
role="system",
content=(
self._build_system_prompt(profile)
+ "\n\n---\n\n"
"[PLANNING — PHASE 2: EXECUTION PLAN]\n\n"
"Task analysis:\n\n"
f"{analysis}\n\n"
"---\n\n"
+ available_tools_block
+ "Now write the execution plan. For each subtask assign a specific executor:\n"
"- TOOL: <tool_name> — a single tool call is enough; use exact tool names from the list above\n"
"- AGENT: <profile_id> — needs multiple steps; a subagent must handle it\n"
"- SELF — final synthesis or a context-dependent single action only\n\n"
"Required output format (use exactly this structure):\n\n"
"## Plan\n\n"
"**Task:** [reformulated task]\n"
"**Goal:** [success criterion]\n\n"
"**Steps:**\n"
"1. [description] → TOOL: tool_name\n"
"2. [description] → AGENT: profile_id\n"
"3. [description] → SELF\n\n"
"**Parallel:** [step numbers that can run simultaneously, or NONE]\n"
"**Risks:** [unknowns to watch for, or NONE]\n\n"
"Do not write prose. Do not start executing. Plan only."
),
)
# Phase 2 only needs the analysis (embedded above) and the original request.
# Full history is intentionally excluded to keep the focus on plan structure.
phase2_ctx: list[Message] = [phase2_system]
if mem:
phase2_ctx.append(mem)
user_msgs = [m for m in session.context if m.role == "user"]
if user_msgs:
phase2_ctx.append(user_msgs[-1])
try:
r2 = await asyncio.wait_for(
llm.complete(phase2_ctx, tools=None, temperature=0.3, model=profile.model, think=False),
timeout=settings.llm_complete_timeout,
)
plan_text = (r2.content or "").strip()
except asyncio.TimeoutError:
log.warning("agent.planning_phase2_timeout", timeout=settings.llm_complete_timeout)
return
except Exception:
log.warning("agent.planning_phase2_failed", exc_info=True)
return
if r2.prompt_tokens or r2.completion_tokens:
yield AIHelperTokensUsed(
prompt_tokens=r2.prompt_tokens or 0,
completion_tokens=r2.completion_tokens or 0,
)
if not plan_text:
return
# Must have at least one numbered step
if not _re.search(r"^\s*\d+[\.\)]", plan_text, _re.MULTILINE):
log.debug("agent.planning_skipped", reason="no_numbered_steps")
return
if _stop and _stop.is_set():
log.debug("agent.planning_stopped", phase=2)
return
# ── Phase 3: AIHelper plan critic ──────────────────────────────────────
yield PlanningStatus(phase=3, label="Reviewing plan...")
# Independent pass: validates and corrects executor assignments against
# the actual tool list. Falls back to Phase 2 plan on any failure.
if tool_names_list:
try:
critic_system = (
"You are a plan validator for an AI assistant. "
"You receive a task analysis, an execution plan draft, and a list of available tools.\n\n"
"Your job:\n"
"1. Every \"TOOL: <name>\" step must use a name from the available tools list exactly. "
"If a tool name is wrong or missing, replace it with the closest available tool, "
"or change the executor to SELF.\n"
"2. A TOOL step must require only a single tool call. "
"If a step clearly needs multiple calls, change its executor to SELF.\n"
"3. Steps must be in logical dependency order — no step may depend on a result "
"from a later step.\n\n"
"Rules:\n"
"- Do not add or remove steps.\n"
"- Do not rewrite descriptions unless fixing an executor.\n"
"- Return the corrected plan in the same ## Plan format.\n"
"- If the plan is already correct, return it unchanged.\n"
"- No commentary. No preamble. Return the plan only."
)
critic_prompt = (
f"Available tools: {tool_names_list}\n\n"
f"Task analysis:\n{analysis}\n\n"
f"Execution plan draft:\n{plan_text}"
)
phase3_ctx = [
Message(role="system", content=critic_system),
Message(role="user", content=critic_prompt),
]
r3 = await asyncio.wait_for(
llm.complete(phase3_ctx, tools=None, temperature=0.1, model=profile.model, think=False),
timeout=settings.llm_complete_timeout,
)
reviewed = (r3.content or "").strip()
if r3.prompt_tokens or r3.completion_tokens:
yield AIHelperTokensUsed(
prompt_tokens=r3.prompt_tokens or 0,
completion_tokens=r3.completion_tokens or 0,
)
# Accept the reviewed plan only if it still has numbered steps
if reviewed and _re.search(r"^\s*\d+[\.\)]", reviewed, _re.MULTILINE):
plan_text = reviewed
log.debug("agent.planning_phase3_applied", length=len(plan_text))
else:
log.warning("agent.planning_phase3_invalid", preview=reviewed[:120])
except asyncio.TimeoutError:
log.warning("agent.planning_phase3_timeout")
except Exception:
log.warning("agent.planning_phase3_failed", exc_info=True)
# Warn if executor assignments are still missing (plan may be malformed)
if not _re.search(r"(TOOL:|AGENT:|→\s*SELF)", plan_text):
log.warning("agent.planning_no_executors", hint="plan lacks TOOL/AGENT/SELF assignments")
# Inject plan into context so the main loop continues from it,
# and into messages (with is_plan flag) so the UI can render a plan card after reload.
context.append(Message(role="assistant", content=plan_text))
if messages is not None:
messages.append(Message(role="assistant", content=plan_text, is_plan=True))
log.debug("agent.plan_ready", phases=3, length=len(plan_text))
yield PlanReady(plan=plan_text)
async def _run_workers(
self,
session,
llm: LLMBackend,
model: str,
context_tokens: int | None,
) -> list[AgentEvent]:
"""Run all workers sequentially; collect their events."""
from navi.workers.base import WorkerContext
ctx = WorkerContext(
session_id=session.id,
context_tokens=context_tokens,
max_context_tokens=settings.ollama_num_ctx,
llm=llm,
model=model,
temperature=settings.context_summary_temperature,
session_store=self._sessions,
)
events: list[AgentEvent] = []
for worker in self._workers:
try:
result = await worker.run(session, ctx)
events.extend(result.events)
except Exception:
log.warning("agent.worker_failed",
worker=type(worker).__name__, exc_info=True)
return events
async def _memory_msg(self) -> "Message | None":
"""Return an ephemeral system message with the user memory summary, or None."""
if not self._memory:
return None
summary = await self._memory.get_summary()
if not summary:
return None
return Message(role="system", content=f"## What I remember about the user\n\n{summary}")
def _build_context(
self,
session_context: list[Message],
profile: "AgentProfile",
mem: "Message | None",
) -> list[Message]:
"""Build the full LLM context for one call.
System prompt is injected fresh from the current profile every time —
it is NOT stored in session.context so that profile switches take
effect immediately without touching stored history.
Memory (if any) is placed right after the system message.
Any system messages already in session.context are stripped (migration safety).
"""
system_msg = Message(
role="system",
content=self._build_system_prompt(profile),
)
conv = [m for m in session_context if m.role != "system"]
result: list[Message] = [system_msg]
if mem:
result.append(mem)
result.extend(conv)
return result
def _build_system_prompt(self, profile: "AgentProfile") -> str:
parts: list[str] = []
persona = settings.navi_persona.strip()
if persona:
parts.append(persona)
parts.append(profile.system_prompt)
# Compact profiles block — every agent knows what other profiles exist
# and when to switch. Injected dynamically so new profiles appear automatically.
other = [p for p in self._profiles.all() if p.id != profile.id]
if other:
lines = [
"## Available profiles",
f"Current: **{profile.id}**",
]
for p in other:
desc = p.short_description or p.description
lines.append(f"· {p.id}: {desc}")
lines.append("→ Switch profiles on your own judgment — do not ask for permission. When a task clearly fits another profile, call switch_profile immediately, then inform the user which profile is now active and why. Use list_profiles if you need details about a profile's capabilities.")
parts.append("\n".join(lines))
return "\n\n---\n\n".join(parts)
def _tool_list(self, enabled: list[str]) -> list[Tool]:
names = list(enabled)
extra = _load_user_enabled_tools()
for name in extra:
if name not in names:
names.append(name)
result = []
for name in names:
try:
result.append(self._tools.get(name))
except Exception:
pass
return result
def _get_backend(self, backend_key: str) -> LLMBackend:
return self._backends.get(backend_key)
def _check_context_size(self, context: list[Message]) -> None:
"""Raise ContextTooLargeError before an LLM call if the context is dangerously large.
Uses a conservative character-based estimate (~4 chars per token for text).
Images are counted at 500 tokens each (rough vision-model estimate).
Checks against the *remaining* budget, not a fixed percentage of the window:
available_for_input = ollama_num_ctx - output_reserve
where output_reserve is a fixed token headroom reserved for the model's response.
This correctly accounts for sessions where conversation history already consumes
a large portion of the window.
"""
if not context:
return
output_reserve = 2048 # tokens reserved for the model's own response
def _estimate(msgs: list[Message]) -> int:
chars = sum(len(m.content or "") for m in msgs)
imgs = sum(500 for m in msgs if m.images)
return chars // 4 + imgs
total = _estimate(context)
available = settings.ollama_num_ctx - output_reserve
if total > available:
existing = _estimate(context[:-1])
new = _estimate(context[-1:])
remaining = available - existing
raise ContextTooLargeError(
f"Context too large: new content is ~{new:,} estimated tokens, "
f"but only ~{max(0, remaining):,} tokens are available "
f"(window {settings.ollama_num_ctx:,}, already used ~{existing:,}, "
f"output_reserve {output_reserve:,}). "
"Split the file into smaller parts or delegate to a subagent."
)
async def _run_single_tool(
self,
tc: ToolCallRequest,
tool_map: dict[str, Tool],
) -> tuple[ToolEvent, Message, "Message | None"]:
"""Execute one tool call and return (ToolEvent, tool_msg, optional_image_msg).
Called via asyncio.create_task() from run_stream() so that the parent
generator can drain the event sink queue concurrently.
"""
tool = tool_map.get(tc.name)
image_msg = None
if tool is None:
content = f"Error: tool '{tc.name}' not found."
event = ToolEvent(tool_name=tc.name, arguments=tc.arguments,
result=content, success=False)
else:
log.info("tool.execute", tool=tc.name, args=tc.arguments)
result = await tool.execute(tc.arguments)
content = result.to_message_content()
event = ToolEvent(tool_name=tc.name, arguments=tc.arguments,
result=content, success=result.success)
if result.success and result.metadata and result.metadata.get("is_image"):
b64 = result.metadata.get("base64")
if b64:
image_msg = Message(
role="user",
content=f"[Image loaded via {tc.name} — analyse it]",
images=[b64],
)
msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name)
return event, msg, image_msg
async def _execute_tool_calls(
self, tool_calls: list[ToolCallRequest], tools: list[Tool]
) -> tuple[list[Message], list[Message]]:
tool_map = {t.name: t for t in tools}
async def _run_one(tc: ToolCallRequest) -> tuple[Message, Message | None]:
tool = tool_map.get(tc.name)
image_msg = None
if tool is None:
content = f"Error: tool '{tc.name}' not found."
else:
log.info("tool.execute", tool=tc.name, args=tc.arguments)
result = await tool.execute(tc.arguments)
content = result.to_message_content()
if result.success and result.metadata and result.metadata.get("is_image"):
b64 = result.metadata.get("base64")
if b64:
image_msg = Message(
role="user",
content=f"[Image loaded via {tc.name} — analyse it]",
images=[b64],
)
tool_msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name)
return tool_msg, image_msg
pairs = await asyncio.gather(*[_run_one(tc) for tc in tool_calls])
tool_msgs = [p[0] for p in pairs]
image_msgs = [p[1] for p in pairs if p[1] is not None]
return tool_msgs, image_msgs
async def _execute_tool_calls_streaming(
self, tool_calls: list[ToolCallRequest], tools: list[Tool]
) -> tuple[list[tuple[ToolEvent, Message]], list[Message]]:
tool_map = {t.name: t for t in tools}
async def _run_one(tc: ToolCallRequest) -> tuple[ToolEvent, Message, Message | None]:
tool = tool_map.get(tc.name)
image_msg = None
if tool is None:
content = f"Error: tool '{tc.name}' not found."
event = ToolEvent(
tool_name=tc.name, arguments=tc.arguments, result=content, success=False
)
else:
log.info("tool.execute", tool=tc.name, args=tc.arguments)
result = await tool.execute(tc.arguments)
content = result.to_message_content()
event = ToolEvent(
tool_name=tc.name,
arguments=tc.arguments,
result=content,
success=result.success,
)
if result.success and result.metadata and result.metadata.get("is_image"):
b64 = result.metadata.get("base64")
if b64:
image_msg = Message(
role="user",
content=f"[Image loaded via {tc.name} — analyse it]",
images=[b64],
)
msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name)
return event, msg, image_msg
triples = await asyncio.gather(*[_run_one(tc) for tc in tool_calls])
pairs = [(t[0], t[1]) for t in triples]
image_msgs = [t[2] for t in triples if t[2] is not None]
return pairs, image_msgs