"""
Agent: the tool-calling loop.
Flow:
1. Receive user message, load session + profile
2. Build tool schemas from profile's enabled_tools
3. Loop (up to max_iterations):
a. Call LLM with session.context (may be compressed) + tool schemas
b. If finish_reason == "stop" -> stream final response
c. If finish_reason == "tool_calls" -> execute tools, append to both
session.messages (display history) and session.context (LLM context)
4. After StreamEnd: run workers sequentially (e.g. context compression)
session.messages — full display history, never compressed
session.context — what the LLM sees; workers may compress this
"""
import asyncio
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator
import structlog
from navi.config import settings
from navi.exceptions import ContextTooLargeError, LLMBackendError, MaxIterationsReached, SessionNotFound
from navi.llm.base import LLMBackend, LLMChunk, Message, ToolCallRequest
from navi.tools.base import Tool, current_event_sink, current_stop_event
from .compressor import compress_context, should_compress
from .events import (
AgentEvent,
ContextCompressed,
PlanReady,
StreamEnd,
StreamStopped,
SubagentComplete,
TextDelta,
ThinkingDelta,
ThinkingEnd,
ToolEvent,
ToolStarted,
TurnThinking,
)
from .registry import BackendRegistry, ProfileRegistry, ToolRegistry
from .session import SessionStore
if TYPE_CHECKING:
from navi.memory.store import MemoryStore
from navi.workers.base import Worker, WorkerContext
_USER_ENABLED_FILE = Path(settings.tools_dir) / "enabled.json"
async def _iter_stream_guarded(
stream_gen: "AsyncGenerator[LLMChunk, None]",
stop_event: "asyncio.Event | None",
first_chunk_timeout: float,
chunk_timeout: float,
) -> "AsyncGenerator[LLMChunk, None]":
"""
Wraps a streaming LLM generator with two safety mechanisms:
1. Stop-event responsiveness during prefill.
Normally, the agent only checks stop_event *between* chunks. During the
prefill phase (processing input tokens) Ollama emits no chunks at all —
the first await can block for minutes on large contexts. This wrapper polls
stop_event every second so the user's Stop button works even then.
2. Timeouts as a last-resort safety net.
first_chunk_timeout: how long to wait for the first token (prefill).
chunk_timeout: max gap between subsequent tokens.
On timeout the generator is closed, which terminates the HTTP connection
to Ollama → Ollama halts generation → GPU load drops to idle.
"""
first = True
chunk_task: asyncio.Task | None = None
try:
while True:
timeout = first_chunk_timeout if first else chunk_timeout
# Create one task per chunk; reuse across poll iterations so we
# don't accidentally start multiple concurrent __anext__ calls.
chunk_task = asyncio.ensure_future(stream_gen.__anext__())
elapsed = 0.0
while True:
done, _ = await asyncio.wait({chunk_task}, timeout=1.0)
if done:
break
elapsed += 1.0
if stop_event and stop_event.is_set():
chunk_task.cancel()
try:
await chunk_task
except (asyncio.CancelledError, Exception):
pass
chunk_task = None
return
if elapsed >= timeout:
chunk_task.cancel()
try:
await chunk_task
except (asyncio.CancelledError, Exception):
pass
chunk_task = None
label = "first token (context may be too large for this model)" if first else "next token"
raise LLMBackendError(
f"LLM timed out after {elapsed:.0f}s waiting for {label}."
)
try:
chunk = chunk_task.result()
except StopAsyncIteration:
chunk_task = None
return
chunk_task = None
first = False
yield chunk
if stop_event and stop_event.is_set():
return
finally:
# Cancel any in-flight __anext__ task so we don't leave a zombie
# coroutine holding an open HTTP connection to Ollama.
if chunk_task is not None and not chunk_task.done():
chunk_task.cancel()
try:
await chunk_task
except (asyncio.CancelledError, Exception):
pass
# Closing the generator terminates the HTTP connection to Ollama,
# which signals it to stop generating (GPU returns to idle).
try:
await stream_gen.aclose()
except Exception:
pass
def _load_user_enabled_tools() -> list[str]:
try:
return json.loads(_USER_ENABLED_FILE.read_text())
except Exception:
return []
log = structlog.get_logger()
# Sentinel: placed in the event sink by the tool wrapper to signal completion.
_TOOL_DONE = object()
class Agent:
def __init__(
self,
session_store: "SessionStore | None",
profile_registry: ProfileRegistry,
tool_registry: ToolRegistry,
backend_registry: BackendRegistry,
workers: list["Worker"] | None = None,
memory_store: "MemoryStore | None" = None,
) -> None:
self._sessions = session_store
self._profiles = profile_registry
self._tools = tool_registry
self._backends = backend_registry
self._workers: list["Worker"] = workers or []
self._memory = memory_store
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
async def run(self, session_id: str, user_message: str, images: list[str] | None = None) -> str:
"""Non-streaming: run the full tool-calling loop and return the final text."""
session = await self._sessions.get(session_id)
if session is None:
raise SessionNotFound(session_id)
profile = self._profiles.get(session.profile_id)
tools = self._tool_list(profile.enabled_tools)
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
mem = await self._memory_msg()
# Expose session_id to tools (e.g. SSH connection pool) via ContextVar
from navi.tools.base import current_session_id as _sid_var
_sid_var.set(session_id)
user_msg = Message(role="user", content=user_message, images=images or None,
created_at=datetime.now(timezone.utc))
session.messages.append(user_msg)
session.context.append(user_msg)
await self._sessions.save(session)
for iteration in range(profile.max_iterations):
log.debug("agent.iteration", session_id=session_id, iteration=iteration)
response = await llm.complete(
self._build_context(session.context, profile, mem),
tools=tool_schemas if tools else None,
temperature=profile.temperature,
model=profile.model,
)
if response.finish_reason == "stop" or not response.tool_calls:
content = response.content or ""
assistant_msg = Message(role="assistant", content=content,
created_at=datetime.now(timezone.utc))
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
await self._sessions.save(session)
return content
# Tool calls turn — append to both messages and context
assistant_msg = Message(
role="assistant",
content=response.content,
tool_calls=response.tool_calls,
)
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
tool_results, image_injections = await self._execute_tool_calls(response.tool_calls, tools)
session.messages.extend(tool_results)
session.context.extend(tool_results)
# Image injections are synthetic LLM helpers — context only
session.context.extend(image_injections)
await self._sessions.save(session)
raise MaxIterationsReached(profile.max_iterations)
async def run_ephemeral(
self,
user_message: str,
profile_id: str,
max_iterations: int = 20,
exclude_tools: list[str] | None = None,
) -> str:
"""
Run a sub-agent loop without a persistent session.
Intended for spawning from tools (e.g. SpawnAgentTool).
No DB reads/writes — uses a temporary in-memory context.
Tools listed in exclude_tools are stripped from the tool list
(use this to prevent recursion: exclude 'spawn_agent').
"""
import uuid as _uuid
# Give each sub-agent its own scratchpad namespace so parallel or
# sequential sub-agents don't clobber each other's working notes.
from navi.tools.base import current_session_id as _sid_var
_sid_var.set(f"subagent_{_uuid.uuid4().hex[:12]}")
profile = self._profiles.get(profile_id)
exclude = set(exclude_tools or [])
tools = [t for t in self._tool_list(profile.enabled_tools) if t.name not in exclude]
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
mem = await self._memory_msg()
# Sub-agent context: only user/assistant/tool messages — system is injected dynamically.
context: list[Message] = [
Message(role="user", content=user_message, created_at=datetime.now(timezone.utc))
]
# Read the event sink set by the parent run_stream() for this tool call.
# If None (e.g. called from run(), not run_stream()), events are silently dropped.
sink = current_event_sink.get()
log.info("agent.subagent.start", profile_id=profile_id, max_iterations=max_iterations)
stop_event = current_stop_event.get()
tool_map = {t.name: t for t in tools}
_sub_tokens: int = 0 # tokens from the final LLM call
_sub_tool_count: int = 0 # total tool calls across all iterations
for iteration in range(max_iterations):
if stop_event and stop_event.is_set():
return accumulated_text if iteration > 0 else ""
log.debug("agent.subagent.iteration", iteration=iteration)
accumulated_text = ""
accumulated_thinking = ""
turn_tool_calls: list[ToolCallRequest] | None = None
turn_tokens: int | None = None
built_ctx = self._build_context(context, profile, mem)
self._check_context_size(built_ctx)
async for chunk in _iter_stream_guarded(
llm.stream_complete(
built_ctx,
tools=tool_schemas if tools else None,
temperature=profile.temperature,
model=profile.model,
),
stop_event=stop_event,
first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
chunk_timeout=settings.llm_stream_chunk_timeout,
):
if chunk.prompt_tokens is not None or chunk.completion_tokens is not None:
turn_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0)
if chunk.thinking:
accumulated_thinking += chunk.thinking
if chunk.delta:
accumulated_text += chunk.delta
if chunk.tool_calls:
turn_tool_calls = chunk.tool_calls
if stop_event and stop_event.is_set():
return accumulated_text
if not turn_tool_calls:
log.info("agent.subagent.complete", iterations=iteration + 1,
result_len=len(accumulated_text))
_sub_tokens = turn_tokens or 0
if sink is not None:
await sink.put(SubagentComplete(
token_count=_sub_tokens,
tool_call_count=_sub_tool_count,
))
return accumulated_text
# Emit accumulated thinking before tool calls
if accumulated_thinking and sink is not None:
log.debug("agent.subagent.turn_thinking", length=len(accumulated_thinking))
await sink.put(TurnThinking(thinking=accumulated_thinking, is_subagent=True))
context.append(Message(
role="assistant",
content=accumulated_text or None,
tool_calls=turn_tool_calls,
))
# Execute each tool call sequentially, emitting events to parent sink
for tc in turn_tool_calls:
_sub_tool_count += 1
if sink is not None:
await sink.put(ToolStarted(
tool_name=tc.name, arguments=tc.arguments, is_subagent=True
))
tool = tool_map.get(tc.name)
image_msg = None
if tool is None:
content = f"Error: tool '{tc.name}' not found."
success = False
else:
log.info("tool.execute.subagent", tool=tc.name, args=tc.arguments)
result = await tool.execute(tc.arguments)
content = result.to_message_content()
success = result.success
if result.success and result.metadata and result.metadata.get("is_image"):
b64 = result.metadata.get("base64")
if b64:
image_msg = Message(
role="user",
content=f"[Image loaded via {tc.name} — analyse it]",
images=[b64],
)
if sink is not None:
await sink.put(ToolEvent(
tool_name=tc.name, arguments=tc.arguments,
result=content, success=success, is_subagent=True,
))
context.append(Message(role="tool", content=content,
tool_call_id=tc.id, name=tc.name))
if image_msg:
context.append(image_msg)
log.warning("agent.subagent.max_iterations", max_iterations=max_iterations)
if sink is not None:
await sink.put(SubagentComplete(token_count=_sub_tokens, tool_call_count=_sub_tool_count))
return "[Sub-agent reached iteration limit without a final answer]"
async def run_stream(
self, session_id: str, user_message: str, images: list[str] | None = None
) -> AsyncGenerator[AgentEvent, None]:
"""
Streaming variant. Yields AgentEvent objects:
- ThinkingDelta / ThinkingEnd: reasoning chunks
- ToolEvent: tool call + result
- TextDelta / StreamEnd: final streamed response
- ContextCompressed: emitted by workers after compression
"""
session = await self._sessions.get(session_id)
if session is None:
raise SessionNotFound(session_id)
profile = self._profiles.get(session.profile_id)
tools = self._tool_list(profile.enabled_tools)
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
mem = await self._memory_msg()
# Expose session_id to tools (e.g. SSH connection pool) via ContextVar
from navi.tools.base import current_session_id as _sid_var
_sid_token = _sid_var.set(session_id)
# Pre-turn compression: if the last turn filled the context past the
# threshold, compress NOW before calling the LLM. This prevents the
# model from seeing an over-full context and generating gibberish
# (e.g. a "summary of the conversation" instead of a real answer).
if (
settings.context_compression_enabled
and session.context_token_count > 0
and should_compress(
session.context_token_count,
settings.ollama_num_ctx,
settings.context_compression_threshold,
)
):
try:
new_context = await compress_context(
context=session.context,
llm=llm,
model=profile.model,
temperature=settings.context_summary_temperature,
keep_recent=settings.context_keep_recent,
)
if new_context is not None:
count_before = len(session.context)
session.context = new_context
session.context_token_count = 0
log.info(
"agent.preturn_compress",
session_id=session_id,
before=count_before,
after=len(new_context),
)
except Exception:
log.warning("agent.preturn_compress_failed", session_id=session_id, exc_info=True)
user_msg = Message(role="user", content=user_message, images=images or None,
created_at=datetime.now(timezone.utc))
session.messages.append(user_msg)
session.context.append(user_msg)
# Persist user message immediately so it survives a client disconnect
# before the assistant reply is ready.
await self._sessions.save(session)
stop_event = current_stop_event.get()
_turn_start = time.monotonic()
_tool_call_count = 0
_subagent_tokens = 0
_prev_tokens = session.context_token_count # token baseline before this turn
# Planning phase — runs a fast, non-streaming LLM call to produce a
# step-by-step plan BEFORE the tool-calling loop starts. The plan is
# injected into session.context as an assistant message so the model
# naturally continues from it, and emitted as PlanReady for the UI.
if profile.planning_enabled:
for _ev in await self._run_planning(session, profile, llm, mem):
yield _ev
# Tool-calling loop — uses stream_complete() for every turn so thinking
# is captured in real-time via ThinkingDelta/ThinkingEnd events.
for iteration in range(profile.max_iterations):
# Cooperative stop: check before each LLM call
if stop_event and stop_event.is_set():
await self._sessions.save(session)
yield StreamStopped()
return
accumulated_text = ""
accumulated_thinking = ""
turn_tool_calls: list[ToolCallRequest] | None = None
thinking_active = False
context_tokens: int | None = None
built_ctx = self._build_context(session.context, profile, mem)
try:
self._check_context_size(built_ctx)
except ContextTooLargeError as e:
# Surface the error as a Navi response (not a raw system error) so the
# user sees a coherent message and the exchange is saved to history.
error_text = str(e)
assistant_msg = Message(role="assistant", content=error_text)
session.context.append(assistant_msg)
session.messages.append(assistant_msg)
await self._sessions.save(session)
yield TextDelta(delta=error_text)
yield StreamEnd(content=error_text)
return
async for chunk in _iter_stream_guarded(
llm.stream_complete(
built_ctx,
tools=tool_schemas if tools else None,
temperature=profile.temperature,
model=profile.model,
),
stop_event=stop_event,
first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
chunk_timeout=settings.llm_stream_chunk_timeout,
):
# Cooperative stop: break cleanly so aclose() is called on the
# generator → Ollama stream closes gracefully, model stays in VRAM.
if stop_event and stop_event.is_set():
if thinking_active:
yield ThinkingEnd()
break
if chunk.prompt_tokens is not None or chunk.completion_tokens is not None:
context_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0)
if chunk.thinking:
accumulated_thinking += chunk.thinking
if not thinking_active:
thinking_active = True
yield ThinkingDelta(delta=chunk.thinking)
elif chunk.delta:
if thinking_active:
thinking_active = False
yield ThinkingEnd()
accumulated_text += chunk.delta
yield TextDelta(delta=chunk.delta)
if chunk.tool_calls:
turn_tool_calls = chunk.tool_calls
if chunk.finish_reason and thinking_active:
thinking_active = False
yield ThinkingEnd()
# Stopped mid-stream — save partial response and exit
if stop_event and stop_event.is_set():
if accumulated_text:
session.messages.append(Message(
role="assistant", content=accumulated_text,
created_at=datetime.now(timezone.utc),
))
await self._sessions.save(session)
yield StreamStopped()
return
if not turn_tool_calls:
# Final response — text already streamed above
_elapsed = round(time.monotonic() - _turn_start, 1)
# Net tokens = marginal cost of this turn (delta from baseline) + subagent tokens
_net_tokens = max(0, (context_tokens or 0) - _prev_tokens) + _subagent_tokens
assistant_msg = Message(
role="assistant",
content=accumulated_text or None,
thinking=accumulated_thinking or None,
created_at=datetime.now(timezone.utc),
elapsed_seconds=_elapsed,
tool_call_count=_tool_call_count if _tool_call_count else None,
token_count=_net_tokens if _net_tokens else None,
)
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
session.context_token_count = context_tokens or 0
await self._sessions.save(session)
yield StreamEnd(
full_content=accumulated_text,
context_tokens=context_tokens,
max_context_tokens=settings.ollama_num_ctx,
elapsed_seconds=_elapsed,
tool_call_count=_tool_call_count,
token_count=_net_tokens if _net_tokens else None,
)
for event in await self._run_workers(session, llm, profile.model, context_tokens):
yield event
return
# Tool calls turn — record to session and execute
assistant_msg = Message(
role="assistant",
content=accumulated_text or None,
thinking=accumulated_thinking or None,
tool_calls=turn_tool_calls,
)
session.messages.append(assistant_msg)
session.context.append(assistant_msg)
tool_map = {t.name: t for t in tools}
for tc in turn_tool_calls:
# 1. Announce immediately so the UI shows a pending card
yield ToolStarted(tool_name=tc.name, arguments=tc.arguments)
# 2. Create a sink queue for sub-agent events from this tool call.
# create_task() snapshots the current ContextVar values, so the
# task will inherit current_event_sink = sink.
sink: asyncio.Queue = asyncio.Queue()
sink_token = current_event_sink.set(sink)
result_holder: list = []
async def _run_with_sentinel(_tc=tc, _holder=result_holder, _sink=sink):
try:
_holder.append(await self._run_single_tool(_tc, tool_map))
except BaseException as exc:
_holder.append(exc)
finally:
await _sink.put(_TOOL_DONE)
tool_task = asyncio.create_task(_run_with_sentinel())
current_event_sink.reset(sink_token) # outer ctx restored; task has its own copy
# 3. Block on the sink until the sentinel arrives.
# Sub-agent ToolStarted/ToolEvent objects come through here in real time.
while True:
item = await sink.get()
if item is _TOOL_DONE:
break
if isinstance(item, SubagentComplete):
_subagent_tokens += item.token_count
_tool_call_count += item.tool_call_count
else:
yield item
# 4. Re-raise tool exception or unpack result
r = result_holder[0] if result_holder else RuntimeError("tool task produced no result")
if isinstance(r, BaseException):
raise r
tool_event, msg, image_msg = r
# 5. Yield the completed ToolEvent and record in session
_tool_call_count += 1
yield tool_event
session.messages.append(msg)
session.context.append(msg)
if image_msg:
session.context.append(image_msg)
# 6. Cooperative stop: check after tool execution before next LLM call
if stop_event and stop_event.is_set():
await self._sessions.save(session)
yield StreamStopped()
return
# 7. If switch_profile was called this iteration, reload profile + tools.
# switch_profile updates the DB but run_stream() holds a local session
# object — without this check the final save would overwrite the change
# and the next LLM call would still use the old tool schemas.
fresh = await self._sessions.get(session_id)
if fresh and fresh.profile_id != session.profile_id:
session.profile_id = fresh.profile_id
profile = self._profiles.get(session.profile_id)
tools = self._tool_list(profile.enabled_tools)
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
log.info(
"agent.profile_reloaded",
session_id=session_id,
new_profile=session.profile_id,
)
await self._sessions.save(session)
raise MaxIterationsReached(profile.max_iterations)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
async def _run_planning(
self,
session,
profile,
llm: LLMBackend,
mem: "Message | None",
) -> list[AgentEvent]:
"""
Two-phase planning:
Phase 1 — Analysis: reformulate the task, identify subtasks and unknowns.
Fast signal-check: if DIRECT, skip planning entirely.
Phase 2 — Execution plan: assign each subtask to a specific executor
(TOOL / AGENT / SELF) using a structured format.
The phase-2 plan is injected into session.context as an assistant message
so the model naturally continues from it in the main loop.
Returns [PlanReady] on success, [] on skip or failure.
"""
import re as _re
# ── Phase 1: Task analysis ─────────────────────────────────────────────
phase1_system = Message(
role="system",
content=(
self._build_system_prompt(profile)
+ "\n\n---\n\n"
"[PLANNING — PHASE 1: ANALYSIS]\n\n"
"Read the user's latest request.\n\n"
"If it is a simple question, casual conversation, or answerable in one step "
"without tools — respond with exactly: DIRECT\n\n"
"Otherwise analyse the request and output:\n\n"
"TASK: [one clear sentence — what actually needs to be done]\n"
"GOAL: [how you will know the task is complete]\n"
"UNKNOWNS: [genuine uncertainties that could block execution, or NONE]\n"
"SUBTASKS:\n"
"1. [discrete unit of work]\n"
"2. [discrete unit of work]\n\n"
"Rules: maximum 6 subtasks. Each must be concrete and actionable. "
"No execution yet — analysis only."
),
)
phase1_ctx: list[Message] = [phase1_system]
if mem:
phase1_ctx.append(mem)
phase1_ctx.extend(m for m in session.context if m.role != "system")
try:
r1 = await asyncio.wait_for(
llm.complete(phase1_ctx, tools=None, temperature=0.3, model=profile.model, think=False),
timeout=settings.llm_complete_timeout,
)
analysis = (r1.content or "").strip()
except asyncio.TimeoutError:
log.warning("agent.planning_phase1_timeout", timeout=settings.llm_complete_timeout)
return []
except Exception:
log.warning("agent.planning_phase1_failed", exc_info=True)
return []
if not analysis or analysis.upper().startswith("DIRECT"):
log.debug("agent.planning_skipped", reason="direct")
return []
# ── Phase 2: Execution plan ────────────────────────────────────────────
phase2_system = Message(
role="system",
content=(
self._build_system_prompt(profile)
+ "\n\n---\n\n"
"[PLANNING — PHASE 2: EXECUTION PLAN]\n\n"
"Task analysis:\n\n"
f"{analysis}\n\n"
"---\n\n"
"Now write the execution plan. For each subtask assign a specific executor:\n"
"- TOOL: <tool_name> — a single tool call is enough\n"
"- AGENT: <profile_id> — needs multiple steps; a subagent must handle it\n"
"- SELF — final synthesis or a context-dependent single action only\n\n"
"Required output format (use exactly this structure):\n\n"
"## Plan\n\n"
"**Task:** [reformulated task]\n"
"**Goal:** [success criterion]\n\n"
"**Steps:**\n"
"1. [description] → TOOL: tool_name\n"
"2. [description] → AGENT: profile_id\n"
"3. [description] → SELF\n\n"
"**Parallel:** [step numbers that can run simultaneously, or NONE]\n"
"**Risks:** [unknowns to watch for, or NONE]\n\n"
"Do not write prose. Do not start executing. Plan only."
),
)
# Phase 2 only needs the analysis (embedded above) and the original request.
# Full history is intentionally excluded to keep the focus on plan structure.
phase2_ctx: list[Message] = [phase2_system]
if mem:
phase2_ctx.append(mem)
user_msgs = [m for m in session.context if m.role == "user"]
if user_msgs:
phase2_ctx.append(user_msgs[-1])
try:
r2 = await asyncio.wait_for(
llm.complete(phase2_ctx, tools=None, temperature=0.3, model=profile.model, think=False),
timeout=settings.llm_complete_timeout,
)
plan_text = (r2.content or "").strip()
except asyncio.TimeoutError:
log.warning("agent.planning_phase2_timeout", timeout=settings.llm_complete_timeout)
return []
except Exception:
log.warning("agent.planning_phase2_failed", exc_info=True)
return []
if not plan_text:
return []
# Must have at least one numbered step
if not _re.search(r"^\s*\d+[\.\)]", plan_text, _re.MULTILINE):
log.debug("agent.planning_skipped", reason="no_numbered_steps")
return []
# Warn if executor assignments are missing (plan may be malformed)
if not _re.search(r"(TOOL:|AGENT:|→\s*SELF)", plan_text):
log.warning("agent.planning_no_executors", hint="plan lacks TOOL/AGENT/SELF assignments")
# Inject plan into context so the main loop continues from it,
# and into messages (with is_plan flag) so the UI can render a plan card after reload.
session.context.append(Message(role="assistant", content=plan_text))
session.messages.append(Message(role="assistant", content=plan_text, is_plan=True))
log.debug("agent.plan_ready", phases=2, length=len(plan_text))
return [PlanReady(plan=plan_text)]
async def _run_workers(
self,
session,
llm: LLMBackend,
model: str,
context_tokens: int | None,
) -> list[AgentEvent]:
"""Run all workers sequentially; collect their events."""
from navi.workers.base import WorkerContext
ctx = WorkerContext(
session_id=session.id,
context_tokens=context_tokens,
max_context_tokens=settings.ollama_num_ctx,
llm=llm,
model=model,
temperature=settings.context_summary_temperature,
session_store=self._sessions,
)
events: list[AgentEvent] = []
for worker in self._workers:
try:
result = await worker.run(session, ctx)
events.extend(result.events)
except Exception:
log.warning("agent.worker_failed",
worker=type(worker).__name__, exc_info=True)
return events
async def _memory_msg(self) -> "Message | None":
"""Return an ephemeral system message with the user memory summary, or None."""
if not self._memory:
return None
summary = await self._memory.get_summary()
if not summary:
return None
return Message(role="system", content=f"## What I remember about the user\n\n{summary}")
def _build_context(
self,
session_context: list[Message],
profile: "AgentProfile",
mem: "Message | None",
) -> list[Message]:
"""Build the full LLM context for one call.
System prompt is injected fresh from the current profile every time —
it is NOT stored in session.context so that profile switches take
effect immediately without touching stored history.
Memory (if any) is placed right after the system message.
Any system messages already in session.context are stripped (migration safety).
"""
system_msg = Message(
role="system",
content=self._build_system_prompt(profile),
)
conv = [m for m in session_context if m.role != "system"]
result: list[Message] = [system_msg]
if mem:
result.append(mem)
result.extend(conv)
return result
def _build_system_prompt(self, profile: "AgentProfile") -> str:
parts: list[str] = []
persona = settings.navi_persona.strip()
if persona:
parts.append(persona)
parts.append(profile.system_prompt)
# Compact profiles block — every agent knows what other profiles exist
# and when to switch. Injected dynamically so new profiles appear automatically.
other = [p for p in self._profiles.all() if p.id != profile.id]
if other:
lines = [
"## Available profiles",
f"Current: **{profile.id}**",
]
for p in other:
desc = p.short_description or p.description
lines.append(f"· {p.id}: {desc}")
lines.append("→ Use switch_profile to change profile. Use list_profiles for full details before switching.")
parts.append("\n".join(lines))
return "\n\n---\n\n".join(parts)
def _tool_list(self, enabled: list[str]) -> list[Tool]:
names = list(enabled)
extra = _load_user_enabled_tools()
for name in extra:
if name not in names:
names.append(name)
result = []
for name in names:
try:
result.append(self._tools.get(name))
except Exception:
pass
return result
def _get_backend(self, backend_key: str) -> LLMBackend:
return self._backends.get(backend_key)
def _check_context_size(self, context: list[Message]) -> None:
"""Raise ContextTooLargeError before an LLM call if the context is dangerously large.
Uses a conservative character-based estimate (~4 chars per token for text).
Images are counted at 500 tokens each (rough vision-model estimate).
Checks against the *remaining* budget, not a fixed percentage of the window:
available_for_input = ollama_num_ctx - output_reserve
where output_reserve is a fixed token headroom reserved for the model's response.
This correctly accounts for sessions where conversation history already consumes
a large portion of the window.
"""
if not context:
return
output_reserve = 2048 # tokens reserved for the model's own response
def _estimate(msgs: list[Message]) -> int:
chars = sum(len(m.content or "") for m in msgs)
imgs = sum(500 for m in msgs if m.images)
return chars // 4 + imgs
total = _estimate(context)
available = settings.ollama_num_ctx - output_reserve
if total > available:
existing = _estimate(context[:-1])
new = _estimate(context[-1:])
remaining = available - existing
raise ContextTooLargeError(
f"Context too large: new content is ~{new:,} estimated tokens, "
f"but only ~{max(0, remaining):,} tokens are available "
f"(window {settings.ollama_num_ctx:,}, already used ~{existing:,}, "
f"output_reserve {output_reserve:,}). "
"Split the file into smaller parts or delegate to a subagent."
)
async def _run_single_tool(
self,
tc: ToolCallRequest,
tool_map: dict[str, Tool],
) -> tuple[ToolEvent, Message, "Message | None"]:
"""Execute one tool call and return (ToolEvent, tool_msg, optional_image_msg).
Called via asyncio.create_task() from run_stream() so that the parent
generator can drain the event sink queue concurrently.
"""
tool = tool_map.get(tc.name)
image_msg = None
if tool is None:
content = f"Error: tool '{tc.name}' not found."
event = ToolEvent(tool_name=tc.name, arguments=tc.arguments,
result=content, success=False)
else:
log.info("tool.execute", tool=tc.name, args=tc.arguments)
result = await tool.execute(tc.arguments)
content = result.to_message_content()
event = ToolEvent(tool_name=tc.name, arguments=tc.arguments,
result=content, success=result.success)
if result.success and result.metadata and result.metadata.get("is_image"):
b64 = result.metadata.get("base64")
if b64:
image_msg = Message(
role="user",
content=f"[Image loaded via {tc.name} — analyse it]",
images=[b64],
)
msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name)
return event, msg, image_msg
async def _execute_tool_calls(
self, tool_calls: list[ToolCallRequest], tools: list[Tool]
) -> tuple[list[Message], list[Message]]:
tool_map = {t.name: t for t in tools}
async def _run_one(tc: ToolCallRequest) -> tuple[Message, Message | None]:
tool = tool_map.get(tc.name)
image_msg = None
if tool is None:
content = f"Error: tool '{tc.name}' not found."
else:
log.info("tool.execute", tool=tc.name, args=tc.arguments)
result = await tool.execute(tc.arguments)
content = result.to_message_content()
if result.success and result.metadata and result.metadata.get("is_image"):
b64 = result.metadata.get("base64")
if b64:
image_msg = Message(
role="user",
content=f"[Image loaded via {tc.name} — analyse it]",
images=[b64],
)
tool_msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name)
return tool_msg, image_msg
pairs = await asyncio.gather(*[_run_one(tc) for tc in tool_calls])
tool_msgs = [p[0] for p in pairs]
image_msgs = [p[1] for p in pairs if p[1] is not None]
return tool_msgs, image_msgs
async def _execute_tool_calls_streaming(
self, tool_calls: list[ToolCallRequest], tools: list[Tool]
) -> tuple[list[tuple[ToolEvent, Message]], list[Message]]:
tool_map = {t.name: t for t in tools}
async def _run_one(tc: ToolCallRequest) -> tuple[ToolEvent, Message, Message | None]:
tool = tool_map.get(tc.name)
image_msg = None
if tool is None:
content = f"Error: tool '{tc.name}' not found."
event = ToolEvent(
tool_name=tc.name, arguments=tc.arguments, result=content, success=False
)
else:
log.info("tool.execute", tool=tc.name, args=tc.arguments)
result = await tool.execute(tc.arguments)
content = result.to_message_content()
event = ToolEvent(
tool_name=tc.name,
arguments=tc.arguments,
result=content,
success=result.success,
)
if result.success and result.metadata and result.metadata.get("is_image"):
b64 = result.metadata.get("base64")
if b64:
image_msg = Message(
role="user",
content=f"[Image loaded via {tc.name} — analyse it]",
images=[b64],
)
msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name)
return event, msg, image_msg
triples = await asyncio.gather(*[_run_one(tc) for tc in tool_calls])
pairs = [(t[0], t[1]) for t in triples]
image_msgs = [t[2] for t in triples if t[2] is not None]
return pairs, image_msgs