Newer
Older
navi-1 / navi / core / agent.py
"""
Agent: the tool-calling loop.

Flow:
1. Receive user message, load session + profile
2. Build tool schemas from profile's enabled_tools
3. Loop (up to max_iterations):
   a. Call LLM with session.context (may be compressed) + tool schemas
   b. If finish_reason == "stop"  -> stream final response
   c. If finish_reason == "tool_calls" -> execute tools, append to both
      session.messages (display history) and session.context (LLM context)
4. After StreamEnd: run workers sequentially (e.g. context compression)

session.messages — full display history, never compressed
session.context  — what the LLM sees; workers may compress this
"""

import asyncio
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator

import structlog

from navi.config import settings
from navi.exceptions import ContextTooLargeError, LLMBackendError, MaxIterationsReached, SessionNotFound
from navi.llm.base import LLMBackend, LLMChunk, Message, ToolCallRequest
from navi.tools.base import Tool, current_event_sink, current_stop_event

from .compressor import compress_context, should_compress
from .events import (
    AgentEvent,
    AIHelperTokensUsed,
    ContextCompressed,
    PlanningDebugData,
    PlanningStatus,
    PlanReady,
    StreamEnd,
    StreamStopped,
    SubagentComplete,
    TextDelta,
    ThinkingDelta,
    ThinkingEnd,
    ToolEvent,
    ToolStarted,
    TurnThinking,
)
from .registry import BackendRegistry, ProfileRegistry, ToolRegistry
from .session import SessionStore

if TYPE_CHECKING:
    from navi.context_providers._loader import ContextProviderRegistry
    from navi.memory.store import MemoryStore
    from navi.workers.base import Worker, WorkerContext

_USER_ENABLED_FILE = Path(settings.tools_dir) / "enabled.json"


async def _iter_stream_guarded(
    stream_gen: "AsyncGenerator[LLMChunk, None]",
    stop_event: "asyncio.Event | None",
    first_chunk_timeout: float,
    chunk_timeout: float,
) -> "AsyncGenerator[LLMChunk, None]":
    """
    Wraps a streaming LLM generator with two safety mechanisms:

    1. Stop-event responsiveness during prefill.
       Normally, the agent only checks stop_event *between* chunks. During the
       prefill phase (processing input tokens) Ollama emits no chunks at all —
       the first await can block for minutes on large contexts. This wrapper polls
       stop_event every second so the user's Stop button works even then.

    2. Timeouts as a last-resort safety net.
       first_chunk_timeout: how long to wait for the first token (prefill).
       chunk_timeout: max gap between subsequent tokens.
       On timeout the generator is closed, which terminates the HTTP connection
       to Ollama → Ollama halts generation → GPU load drops to idle.
    """
    first = True
    chunk_task: asyncio.Task | None = None
    try:
        while True:
            timeout = first_chunk_timeout if first else chunk_timeout
            # Create one task per chunk; reuse across poll iterations so we
            # don't accidentally start multiple concurrent __anext__ calls.
            chunk_task = asyncio.ensure_future(stream_gen.__anext__())
            elapsed = 0.0

            while True:
                done, _ = await asyncio.wait({chunk_task}, timeout=1.0)
                if done:
                    break
                elapsed += 1.0
                if stop_event and stop_event.is_set():
                    chunk_task.cancel()
                    try:
                        await chunk_task
                    except (asyncio.CancelledError, Exception):
                        pass
                    chunk_task = None
                    return
                if elapsed >= timeout:
                    chunk_task.cancel()
                    try:
                        await chunk_task
                    except (asyncio.CancelledError, Exception):
                        pass
                    chunk_task = None
                    label = "first token (context may be too large for this model)" if first else "next token"
                    raise LLMBackendError(
                        f"LLM timed out after {elapsed:.0f}s waiting for {label}."
                    )

            try:
                chunk = chunk_task.result()
            except StopAsyncIteration:
                chunk_task = None
                return

            chunk_task = None
            first = False
            yield chunk

            if stop_event and stop_event.is_set():
                return

    finally:
        # Cancel any in-flight __anext__ task so we don't leave a zombie
        # coroutine holding an open HTTP connection to Ollama.
        if chunk_task is not None and not chunk_task.done():
            chunk_task.cancel()
            try:
                await chunk_task
            except (asyncio.CancelledError, Exception):
                pass
        # Closing the generator terminates the HTTP connection to Ollama,
        # which signals it to stop generating (GPU returns to idle).
        try:
            await stream_gen.aclose()
        except Exception:
            pass


def _load_user_enabled_tools() -> list[str]:
    try:
        return json.loads(_USER_ENABLED_FILE.read_text())
    except Exception:
        return []


def _parse_plan_steps(plan_text: str) -> list[str]:
    """Extract numbered step lines from the **Steps:** section of a plan."""
    import re
    m = re.search(r'\*\*Steps:\*\*\s*\n(.*?)(?=\n\s*\*\*[A-Z]|\Z)', plan_text, re.DOTALL)
    if not m:
        return []
    steps_block = m.group(1)
    steps = re.findall(r'^\s*\d+[\.\)]\s*(.+)', steps_block, re.MULTILINE)
    return [s.strip() for s in steps if s.strip()]


log = structlog.get_logger()

# Sentinel: placed in the event sink by the tool wrapper to signal completion.
_TOOL_DONE = object()


def _todo_status_snapshot(session_id: str) -> frozenset[tuple[str, str]]:
    """Return a frozenset of (task_text, status) for the current session's todo list.

    Used by the anti-stall detector to compare todo state before and after an
    iteration — any status change means the model made real progress.
    """
    try:
        from navi.tools.todo import _plans
        return frozenset((t.text, t.status) for t in _plans.get(session_id, []))
    except Exception:
        return frozenset()


def _todo_failed_steps(session_id: str) -> frozenset[tuple[int, str]]:
    """Return a frozenset of (1-based index, task_text) for steps currently marked failed."""
    try:
        from navi.tools.todo import _plans
        return frozenset(
            (i + 1, t.text)
            for i, t in enumerate(_plans.get(session_id, []))
            if t.status == "failed"
        )
    except Exception:
        return frozenset()


class Agent:
    def __init__(
        self,
        session_store: "SessionStore | None",
        profile_registry: ProfileRegistry,
        tool_registry: ToolRegistry,
        backend_registry: BackendRegistry,
        workers: list["Worker"] | None = None,
        memory_store: "MemoryStore | None" = None,
        cp_registry: "ContextProviderRegistry | None" = None,
    ) -> None:
        self._sessions = session_store
        self._profiles = profile_registry
        self._tools = tool_registry
        self._backends = backend_registry
        self._workers: list["Worker"] = workers or []
        self._memory = memory_store
        self._cp_registry = cp_registry

    # ------------------------------------------------------------------
    # Public interface
    # ------------------------------------------------------------------

    async def run(self, session_id: str, user_message: str, images: list[str] | None = None) -> str:
        """Non-streaming: run the full tool-calling loop and return the final text."""
        session = await self._sessions.get(session_id)
        if session is None:
            raise SessionNotFound(session_id)

        profile = self._profiles.get(session.profile_id)
        tools = self._tool_list(profile.enabled_tools)
        tool_schemas = [t.schema() for t in tools]
        llm = self._get_backend(profile.llm_backend)

        mem = await self._memory_msg()

        # Expose session_id to tools (e.g. SSH connection pool) via ContextVar
        from navi.tools.base import current_session_id as _sid_var
        _sid_var.set(session_id)

        user_msg = Message(role="user", content=user_message, images=images or None,
                           created_at=datetime.now(timezone.utc))
        session.messages.append(user_msg)
        session.context.append(user_msg)
        await self._sessions.save(session)

        ctx_injections = await self._collect_context_injections(profile)
        for iteration in range(profile.max_iterations):
            log.debug("agent.iteration", session_id=session_id, iteration=iteration)
            response = await llm.complete(
                self._build_context(session.context, profile, mem,
                                    iteration=iteration, max_iterations=profile.max_iterations,
                                    extra_system=ctx_injections),
                tools=tool_schemas if tools else None,
                temperature=profile.temperature,
                model=profile.model,
                top_k=profile.top_k,
                top_p=profile.top_p,
            )

            if response.finish_reason == "stop" or not response.tool_calls:
                content = response.content or ""
                assistant_msg = Message(role="assistant", content=content,
                                        created_at=datetime.now(timezone.utc))
                session.messages.append(assistant_msg)
                session.context.append(assistant_msg)
                await self._sessions.save(session)
                return content

            # Tool calls turn — append to both messages and context
            assistant_msg = Message(
                role="assistant",
                content=response.content,
                tool_calls=response.tool_calls,
            )
            session.messages.append(assistant_msg)
            session.context.append(assistant_msg)

            tool_results, image_injections = await self._execute_tool_calls(response.tool_calls, tools)
            session.messages.extend(tool_results)
            session.context.extend(tool_results)
            # Image injections are synthetic LLM helpers — context only
            session.context.extend(image_injections)

        await self._sessions.save(session)
        raise MaxIterationsReached(profile.max_iterations)

    async def run_ephemeral(
        self,
        user_message: str,
        profile_id: str,
        max_iterations: int = 40,
        exclude_tools: list[str] | None = None,
        briefing: str | None = None,
        custom_system_prompt: str | None = None,
        context_transfer: str | None = None,
        timeout_seconds: float = 300.0,
    ) -> tuple[str, bool]:
        """
        Run a sub-agent loop without a persistent session.

        Returns (result_text, completed_normally).
        completed_normally is False if the sub-agent hit the iteration limit or timed out.

        Intended for spawning from tools (e.g. SpawnAgentTool).
        No DB reads/writes — uses a temporary in-memory context.
        Tools listed in exclude_tools are stripped from the tool list
        (use this to prevent recursion: exclude 'spawn_agent').

        System prompt structure (completely separate from the parent's system prompt):
          1. profile.subagent_system_prompt  — focused executor persona
          2. custom_system_prompt            — optional role specialisation for this task
          3. briefing                        — task context (credentials, paths, instructions)

        context_transfer: text from the parent's scratchpad context_transfer section,
            injected as a priming exchange before the task message.
        timeout_seconds: wall-clock timeout for the entire sub-agent run.
        """
        import time as _time
        import uuid as _uuid
        # Give each sub-agent its own scratchpad namespace so parallel or
        # sequential sub-agents don't clobber each other's working notes.
        from navi.tools.base import current_session_id as _sid_var, current_model as _model_var
        _sid_var.set(f"subagent_{_uuid.uuid4().hex[:12]}")

        profile = self._profiles.get(profile_id)
        _model_var.set(profile.model)
        exclude = set(exclude_tools or [])

        # Use dedicated subagent_tools if configured, else fall back to enabled_tools.
        tool_source = profile.subagent_tools if profile.subagent_tools else profile.enabled_tools
        tools = [t for t in self._tool_list(tool_source) if t.name not in exclude]
        tool_schemas = [t.schema() for t in tools]
        llm = self._get_backend(profile.llm_backend)

        mem = await self._memory_msg()

        # Build subagent system prompt — completely separate from the parent's system prompt.
        # No persona, no orchestrator instructions, no profiles block.
        # Structure: executor persona → role specialisation → task context (briefing)
        sys_parts: list[str] = []
        if profile.subagent_system_prompt:
            sys_parts.append(profile.subagent_system_prompt)
        if custom_system_prompt:
            sys_parts.append(custom_system_prompt)
        if briefing:
            sys_parts.append(f"## Task context\n\n{briefing}")
        if not sys_parts:
            # Fallback if profile has no subagent_system_prompt defined
            sys_parts.append(profile.system_prompt)
        subagent_sys_msg = Message(role="system", content="\n\n---\n\n".join(sys_parts))

        # Build initial context.
        # If context_transfer is provided, inject it as a priming exchange so the
        # sub-agent has the parent's working state from the start.
        context: list[Message] = []
        if context_transfer:
            context.append(Message(
                role="user",
                content=f"## Context from parent agent\n\n{context_transfer}",
            ))
            context.append(Message(
                role="assistant",
                content="Understood. I have the context. Ready to begin the task.",
            ))
        context.append(Message(role="user", content=user_message, created_at=datetime.now(timezone.utc)))

        # Read the event sink set by the parent run_stream() for this tool call.
        # If None (e.g. called from run(), not run_stream()), events are silently dropped.
        sink = current_event_sink.get()

        log.info("agent.subagent.start", profile_id=profile_id, max_iterations=max_iterations,
                 tools=len(tools), planning=profile.subagent_planning_enabled)

        stop_event = current_stop_event.get()
        tool_map = {t.name: t for t in tools}

        _sub_tokens: int = 0        # tokens from the final LLM call
        _sub_tool_count: int = 0    # total tool calls across all iterations
        _start_time = _time.monotonic()
        accumulated_text = ""

        # ── Optional planning phase ────────────────────────────────────────────
        if profile.subagent_planning_enabled:
            async for _ev in self._run_planning(
                context, profile, llm, mem, tool_schemas,
                system_prompt_override=subagent_sys_msg.content,
                is_subagent=True,
            ):
                if isinstance(_ev, AIHelperTokensUsed):
                    pass  # token accounting only, not forwarded
                elif sink is not None:
                    await sink.put(_ev)

        # ── Tool-calling loop ──────────────────────────────────────────────────
        for iteration in range(max_iterations):
            if stop_event and stop_event.is_set():
                return accumulated_text, False

            elapsed = _time.monotonic() - _start_time
            if elapsed >= timeout_seconds:
                log.warning("agent.subagent.timeout", elapsed=elapsed, timeout=timeout_seconds)
                if sink is not None:
                    await sink.put(SubagentComplete(token_count=_sub_tokens, tool_call_count=_sub_tool_count))
                return accumulated_text or "[Sub-agent timed out]", False

            log.debug("agent.subagent.iteration", iteration=iteration)

            accumulated_text = ""
            accumulated_thinking = ""
            turn_tool_calls: list[ToolCallRequest] | None = None
            turn_tokens: int | None = None

            # Build context inline — no persona or profiles block for subagents.
            built_ctx: list[Message] = [subagent_sys_msg]
            if mem:
                built_ctx.append(mem)
            built_ctx.extend(m for m in context if m.role != "system")
            self._check_context_size(built_ctx)

            async for chunk in _iter_stream_guarded(
                llm.stream_complete(
                    built_ctx,
                    tools=tool_schemas if tools else None,
                    temperature=profile.temperature,
                    model=profile.model,
                    think=profile.think_enabled,
                    top_k=profile.top_k,
                    top_p=profile.top_p,
                ),
                stop_event=stop_event,
                first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
                chunk_timeout=settings.llm_stream_chunk_timeout,
            ):
                if chunk.prompt_tokens is not None or chunk.completion_tokens is not None:
                    turn_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0)
                if chunk.thinking:
                    accumulated_thinking += chunk.thinking
                if chunk.delta:
                    accumulated_text += chunk.delta
                if chunk.tool_calls:
                    turn_tool_calls = chunk.tool_calls

            if stop_event and stop_event.is_set():
                return accumulated_text, False

            if not turn_tool_calls:
                log.info("agent.subagent.complete", iterations=iteration + 1,
                         result_len=len(accumulated_text))
                _sub_tokens = turn_tokens or 0
                if sink is not None:
                    await sink.put(SubagentComplete(
                        token_count=_sub_tokens,
                        tool_call_count=_sub_tool_count,
                    ))
                return accumulated_text, True

            # Emit accumulated thinking before tool calls
            if accumulated_thinking and sink is not None:
                log.debug("agent.subagent.turn_thinking", length=len(accumulated_thinking))
                await sink.put(TurnThinking(thinking=accumulated_thinking, is_subagent=True))

            context.append(Message(
                role="assistant",
                content=accumulated_text or None,
                tool_calls=turn_tool_calls,
            ))

            # Execute each tool call sequentially, emitting events to parent sink
            for tc in turn_tool_calls:
                _sub_tool_count += 1
                if sink is not None:
                    await sink.put(ToolStarted(
                        tool_name=tc.name, arguments=tc.arguments, is_subagent=True
                    ))

                tool = tool_map.get(tc.name)
                image_msg = None
                if tool is None:
                    content = f"Error: tool '{tc.name}' not found."
                    success = False
                else:
                    log.info("tool.execute.subagent", tool=tc.name, args=tc.arguments)
                    result = await tool.execute(tc.arguments)
                    content = result.to_message_content()
                    success = result.success
                    if result.success and result.metadata and result.metadata.get("is_image"):
                        b64 = result.metadata.get("base64")
                        if b64:
                            image_msg = Message(
                                role="user",
                                content=f"[Image loaded via {tc.name} — analyse it]",
                                images=[b64],
                            )

                if sink is not None:
                    await sink.put(ToolEvent(
                        tool_name=tc.name, arguments=tc.arguments,
                        result=content, success=success, is_subagent=True,
                    ))

                context.append(Message(role="tool", content=content,
                                       tool_call_id=tc.id, name=tc.name))
                if image_msg:
                    context.append(image_msg)

        log.warning("agent.subagent.max_iterations", max_iterations=max_iterations)
        if sink is not None:
            await sink.put(SubagentComplete(token_count=_sub_tokens, tool_call_count=_sub_tool_count))
        return accumulated_text or "[Sub-agent reached iteration limit without a final answer]", False

    async def run_stream(
        self, session_id: str, user_message: str, images: list[str] | None = None
    ) -> AsyncGenerator[AgentEvent, None]:
        """
        Streaming variant. Yields AgentEvent objects:
        - ThinkingDelta / ThinkingEnd: reasoning chunks
        - ToolEvent: tool call + result
        - TextDelta / StreamEnd: final streamed response
        - ContextCompressed: emitted by workers after compression
        """
        session = await self._sessions.get(session_id)
        if session is None:
            raise SessionNotFound(session_id)

        profile = self._profiles.get(session.profile_id)
        tools = self._tool_list(profile.enabled_tools)
        tool_schemas = [t.schema() for t in tools]
        llm = self._get_backend(profile.llm_backend)

        mem = await self._memory_msg()

        # Expose session_id and model to tools via ContextVar
        from navi.tools.base import current_session_id as _sid_var, current_model as _model_var
        _sid_token = _sid_var.set(session_id)
        _model_var.set(profile.model)

        # Pre-turn compression: if the last turn filled the context past the
        # threshold, compress NOW before calling the LLM.  This prevents the
        # model from seeing an over-full context and generating gibberish
        # (e.g. a "summary of the conversation" instead of a real answer).
        if (
            settings.context_compression_enabled
            and session.context_token_count > 0
            and should_compress(
                session.context_token_count,
                settings.ollama_num_ctx,
                settings.context_compression_threshold,
            )
        ):
            try:
                result = await compress_context(
                    context=session.context,
                    llm=llm,
                    model=profile.model,
                    temperature=settings.context_summary_temperature,
                    keep_recent=settings.context_keep_recent,
                    max_tokens=settings.context_summary_max_tokens,
                )
                if result is not None:
                    new_context, summary_text = result
                    count_before = len(session.context)
                    session.context = new_context
                    session.context_token_count = 0
                    session.messages.append(Message(
                        role="system",
                        is_compression=True,
                        content=summary_text,
                    ))
                    log.info(
                        "agent.preturn_compress",
                        session_id=session_id,
                        before=count_before,
                        after=len(new_context),
                    )
                    yield ContextCompressed(
                        messages_before=count_before,
                        messages_after=len(new_context),
                        summary=summary_text,
                    )
            except Exception:
                log.warning("agent.preturn_compress_failed", session_id=session_id, exc_info=True)

        user_msg = Message(role="user", content=user_message, images=images or None,
                           created_at=datetime.now(timezone.utc))
        session.messages.append(user_msg)
        session.context.append(user_msg)
        # Persist user message immediately so it survives a client disconnect
        # before the assistant reply is ready.
        await self._sessions.save(session)

        stop_event = current_stop_event.get()
        _turn_start = time.monotonic()
        _tool_call_count = 0
        _subagent_tokens = 0
        _prev_tokens = session.context_token_count  # token baseline before this turn

        # Planning phase — always runs on the first user message in a session;
        # on subsequent messages uses the profile's planning_enabled flag.
        # force_plan suppresses the DIRECT shortcut: first message is always forced,
        # and planning_mandatory extends that to every subsequent message.
        _is_first_message = sum(1 for m in session.messages if m.role == "user") == 1
        _force_plan = _is_first_message or profile.planning_mandatory
        if _is_first_message or profile.planning_enabled:
            async for _ev in self._run_planning(session.context, profile, llm, mem, tool_schemas, messages=session.messages, force_plan=_force_plan):
                if isinstance(_ev, AIHelperTokensUsed):
                    _subagent_tokens += _ev.total
                elif isinstance(_ev, PlanningDebugData):
                    session.planning_logs.append(_ev.log)
                else:
                    yield _ev

        # Anti-stall state — tracks consecutive iterations without progress.
        # Two independent signals: no todo status change, and repeated identical tool calls.
        _stall_no_todo = 0          # iterations since last todo status change
        _stall_repeat_tools = 0     # iterations with identical tool calls as the previous turn
        _prev_tool_sigs: frozenset[tuple[str, str]] = frozenset()

        # Adaptive re-plan state — detect newly-failed todo steps and inject a
        # re-planning prompt on the following iteration so the model revises its plan.
        _known_failed: frozenset[tuple[int, str]] = frozenset()
        _replan_msg: str | None = None

        ctx_injections = await self._collect_context_injections(profile)

        # Tool-calling loop — uses stream_complete() for every turn so thinking
        # is captured in real-time via ThinkingDelta/ThinkingEnd events.
        for iteration in range(profile.max_iterations):
            # Cooperative stop: check before each LLM call
            if stop_event and stop_event.is_set():
                await self._sessions.save(session)
                yield StreamStopped()
                return

            accumulated_text = ""
            accumulated_thinking = ""
            turn_tool_calls: list[ToolCallRequest] | None = None
            thinking_active = False
            context_tokens: int | None = None

            built_ctx = self._build_context(session.context, profile, mem,
                                            iteration=iteration, max_iterations=profile.max_iterations,
                                            extra_system=ctx_injections)

            if (
                profile.goal_anchoring_enabled
                and iteration > 0
                and iteration % profile.goal_anchoring_interval == 0
            ):
                built_ctx.append(self._build_goal_anchor(session_id, user_message))

            if iteration == 0:
                try:
                    from navi.tools.todo import _plans, _STATUS_ICON
                    tasks = _plans.get(session_id, [])
                    if tasks:
                        lines = ["[Todo — track your progress]"]
                        for i, t in enumerate(tasks):
                            icon = _STATUS_ICON.get(t.status, "?")
                            lines.append(f"  {icon} [{i}] {t.text} ({t.status})")
                        lines.append("Mark each step in_progress when you start it, done when complete.")
                        built_ctx.append(Message(role="system", content="\n".join(lines)))
                except Exception:
                    pass

            # Snapshot todo state before this iteration (for stall detection after)
            _todo_snapshot_before = _todo_status_snapshot(session_id)

            # Adaptive re-plan: inject queued re-plan message from previous iteration
            if profile.adaptive_replan_enabled and _replan_msg:
                built_ctx.append(Message(role="system", content=_replan_msg))
                _replan_msg = None

            if profile.anti_stall_enabled and iteration > 0:
                stalled = (
                    _stall_no_todo >= profile.anti_stall_threshold
                    or _stall_repeat_tools >= profile.anti_stall_threshold
                )
                if stalled:
                    reason = (
                        f"no todo progress for {_stall_no_todo} iterations"
                        if _stall_no_todo >= profile.anti_stall_threshold
                        else f"identical tool calls repeated {_stall_repeat_tools} times"
                    )
                    built_ctx.append(Message(
                        role="system",
                        content=(
                            f"[Anti-stall warning — {reason}] "
                            "You are repeating the same actions without making progress. "
                            "Stop and reconsider: change your approach, try a different tool, "
                            "mark the current step as failed and move on, or ask the user for guidance."
                        ),
                    ))

            try:
                self._check_context_size(built_ctx)
            except ContextTooLargeError as e:
                # Surface the error as a Navi response (not a raw system error) so the
                # user sees a coherent message and the exchange is saved to history.
                error_text = str(e)
                assistant_msg = Message(role="assistant", content=error_text)
                session.context.append(assistant_msg)
                session.messages.append(assistant_msg)
                await self._sessions.save(session)
                yield TextDelta(delta=error_text)
                yield StreamEnd(content=error_text)
                return

            async for chunk in _iter_stream_guarded(
                llm.stream_complete(
                    built_ctx,
                    tools=tool_schemas if tools else None,
                    temperature=profile.temperature,
                    model=profile.model,
                    think=profile.think_enabled,
                    top_k=profile.top_k,
                    top_p=profile.top_p,
                ),
                stop_event=stop_event,
                first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
                chunk_timeout=settings.llm_stream_chunk_timeout,
            ):
                # Cooperative stop: break cleanly so aclose() is called on the
                # generator → Ollama stream closes gracefully, model stays in VRAM.
                if stop_event and stop_event.is_set():
                    if thinking_active:
                        yield ThinkingEnd()
                    break
                if chunk.prompt_tokens is not None or chunk.completion_tokens is not None:
                    context_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0)
                if chunk.thinking:
                    accumulated_thinking += chunk.thinking
                    if not thinking_active:
                        thinking_active = True
                    yield ThinkingDelta(delta=chunk.thinking)
                elif chunk.delta:
                    if thinking_active:
                        thinking_active = False
                        yield ThinkingEnd()
                    accumulated_text += chunk.delta
                    yield TextDelta(delta=chunk.delta)
                if chunk.tool_calls:
                    turn_tool_calls = chunk.tool_calls
                if chunk.finish_reason and thinking_active:
                    thinking_active = False
                    yield ThinkingEnd()

            # Stopped mid-stream — save partial response and exit
            if stop_event and stop_event.is_set():
                if accumulated_text:
                    session.messages.append(Message(
                        role="assistant", content=accumulated_text,
                        created_at=datetime.now(timezone.utc),
                    ))
                await self._sessions.save(session)
                yield StreamStopped()
                return

            if not turn_tool_calls:
                # Final response — text already streamed above
                _elapsed = round(time.monotonic() - _turn_start, 1)
                # Net tokens = marginal cost of this turn (delta from baseline) + subagent tokens
                _net_tokens = max(0, (context_tokens or 0) - _prev_tokens) + _subagent_tokens
                assistant_msg = Message(
                    role="assistant",
                    content=accumulated_text or None,
                    thinking=accumulated_thinking or None,
                    created_at=datetime.now(timezone.utc),
                    elapsed_seconds=_elapsed,
                    tool_call_count=_tool_call_count if _tool_call_count else None,
                    token_count=_net_tokens if _net_tokens else None,
                )
                session.messages.append(assistant_msg)
                session.context.append(assistant_msg)
                session.context_token_count = context_tokens or 0
                await self._sessions.save(session)

                yield StreamEnd(
                    full_content=accumulated_text,
                    context_tokens=context_tokens,
                    max_context_tokens=settings.ollama_num_ctx,
                    elapsed_seconds=_elapsed,
                    tool_call_count=_tool_call_count,
                    token_count=_net_tokens if _net_tokens else None,
                )

                for event in await self._run_workers(session, llm, profile.model, context_tokens):
                    yield event
                return

            # Tool calls turn — record to session and execute
            assistant_msg = Message(
                role="assistant",
                content=accumulated_text or None,
                thinking=accumulated_thinking or None,
                tool_calls=turn_tool_calls,
            )
            session.messages.append(assistant_msg)
            session.context.append(assistant_msg)

            tool_map = {t.name: t for t in tools}
            for tc in turn_tool_calls:
                # 1. Announce immediately so the UI shows a pending card
                yield ToolStarted(tool_name=tc.name, arguments=tc.arguments)

                # 2. Create a sink queue for sub-agent events from this tool call.
                #    create_task() snapshots the current ContextVar values, so the
                #    task will inherit current_event_sink = sink.
                sink: asyncio.Queue = asyncio.Queue()
                sink_token = current_event_sink.set(sink)
                result_holder: list = []

                async def _run_with_sentinel(_tc=tc, _holder=result_holder, _sink=sink):
                    try:
                        _holder.append(await self._run_single_tool(_tc, tool_map))
                    except BaseException as exc:
                        _holder.append(exc)
                    finally:
                        await _sink.put(_TOOL_DONE)

                tool_task = asyncio.create_task(_run_with_sentinel())
                current_event_sink.reset(sink_token)  # outer ctx restored; task has its own copy

                # 3. Block on the sink until the sentinel arrives.
                #    Sub-agent ToolStarted/ToolEvent objects come through here in real time.
                while True:
                    item = await sink.get()
                    if item is _TOOL_DONE:
                        break
                    if isinstance(item, SubagentComplete):
                        _subagent_tokens += item.token_count
                        _tool_call_count += item.tool_call_count
                    elif isinstance(item, AIHelperTokensUsed):
                        _subagent_tokens += item.total
                    else:
                        yield item

                # 4. Re-raise tool exception or unpack result
                r = result_holder[0] if result_holder else RuntimeError("tool task produced no result")
                if isinstance(r, BaseException):
                    raise r
                tool_event, msg, image_msg = r

                # 5. Yield the completed ToolEvent and record in session
                _tool_call_count += 1
                yield tool_event
                session.messages.append(msg)
                session.context.append(msg)
                if image_msg:
                    session.context.append(image_msg)

            # 6. Cooperative stop: check after tool execution before next LLM call
            if stop_event and stop_event.is_set():
                await self._sessions.save(session)
                yield StreamStopped()
                return

            # Update anti-stall counters after all tools in this iteration ran.
            if profile.anti_stall_enabled:
                # Todo progress signal
                if _todo_status_snapshot(session_id) != _todo_snapshot_before:
                    _stall_no_todo = 0
                else:
                    _stall_no_todo += 1

                # Repeated tool call signal
                cur_sigs = frozenset(
                    (tc.name, json.dumps(tc.arguments, sort_keys=True))
                    for tc in (turn_tool_calls or [])
                )
                if cur_sigs and cur_sigs == _prev_tool_sigs:
                    _stall_repeat_tools += 1
                else:
                    _stall_repeat_tools = 0
                _prev_tool_sigs = cur_sigs

            # Adaptive re-plan: detect steps that were newly marked failed this iteration.
            if profile.adaptive_replan_enabled:
                current_failed = _todo_failed_steps(session_id)
                new_failures = current_failed - _known_failed
                _known_failed = current_failed
                if new_failures:
                    failed_labels = ", ".join(
                        f'step {idx} ("{text}")'
                        for idx, text in sorted(new_failures)
                    )
                    _replan_msg = (
                        f"[Adaptive re-plan] {failed_labels} just failed. "
                        "Before continuing, revise your plan: call todo(op=\"set\") or todo(op=\"update\") "
                        "to replace or skip the remaining pending steps with a revised approach "
                        "that accounts for what went wrong. Then continue execution."
                    )
                    log.info("agent.adaptive_replan_queued", failures=len(new_failures),
                             session_id=session_id)

            # 7. If switch_profile was called this iteration, reload profile + tools.
            #    switch_profile updates the DB but run_stream() holds a local session
            #    object — without this check the final save would overwrite the change
            #    and the next LLM call would still use the old tool schemas.
            fresh = await self._sessions.get(session_id)
            if fresh and fresh.profile_id != session.profile_id:
                session.profile_id = fresh.profile_id
                profile = self._profiles.get(session.profile_id)
                tools = self._tool_list(profile.enabled_tools)
                tool_schemas = [t.schema() for t in tools]
                llm = self._get_backend(profile.llm_backend)
                log.info(
                    "agent.profile_reloaded",
                    session_id=session_id,
                    new_profile=session.profile_id,
                )

        await self._sessions.save(session)
        raise MaxIterationsReached(profile.max_iterations)

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    async def _run_planning(
        self,
        context: "list[Message]",
        profile,
        llm: LLMBackend,
        mem: "Message | None",
        tool_schemas: list | None = None,
        messages: "list[Message] | None" = None,
        system_prompt_override: str | None = None,
        is_subagent: bool = False,
        force_plan: bool = False,
    ):
        """
        Planning pipeline (async generator):

        Phase 1 — Analysis (think=profile.think_enabled): reformulate the task,
                   identify subtasks and unknowns. Outputs DIRECT for simple requests.
                   Outputs Reflect: yes/no to signal whether multi-perspective review
                   is warranted.
        Phase 2 — Multi-perspective review (conditional, think=False, parallel):
                   Three advisors (Critic / Pragmatist / Detailer) independently
                   critique the draft analysis. Runs only when
                   profile.planning_reflect_enabled=True AND phase 1 outputs Reflect: yes.
        Phase 3 — Execution plan (think=False): assigns each subtask to TOOL / AGENT / SELF.
                   If phase 2 ran, advisor feedback is embedded in the prompt.

        Yields PlanningStatus before each phase so the UI can show progress,
        then yields PlanReady when the final plan is ready.
        Yields nothing if planning is skipped.
        """
        import re as _re

        # ── Build compact tool list for Phase 2 / Phase 3 ─────────────────────
        if tool_schemas:
            tool_lines = []
            for schema in tool_schemas:
                fn = schema.function if hasattr(schema, "function") else schema.get("function", {})
                name = fn.get("name", "")
                desc = (fn.get("description") or "").split("\n")[0][:80]
                tool_lines.append(f"  - {name}: {desc}")
            available_tools_block = (
                "Available tools (use these exact names for TOOL: assignments):\n"
                + "\n".join(tool_lines)
                + "\n\n"
            )
            tool_names_list = ", ".join(
                (schema.function if hasattr(schema, "function") else schema.get("function", {})).get("name", "")
                for schema in tool_schemas
            )
        else:
            available_tools_block = ""
            tool_names_list = ""

        # Read stop event once — checked between all phases
        _stop = current_stop_event.get()

        # Debug log — collected across all phases, yielded at the end (main agent only)
        _dbg: dict = {"timestamp": datetime.now(timezone.utc).isoformat(), "result": "plan", "phases": {}}

        _base_sys = system_prompt_override if system_prompt_override is not None else self._build_system_prompt(profile)

        # ── Phase 1: Task analysis (with reasoning) ────────────────────────────
        analysis: str = ""
        if profile.planning_phase1_enabled:
            yield PlanningStatus(phase=1, label="Working on it...", is_subagent=is_subagent)
            phase1_system = Message(
                role="system",
                content=(
                    _base_sys
                    + "\n\n---\n\n"
                    "[PLANNING — PHASE 1: ANALYSIS]\n\n"
                    "Read the user's latest request.\n\n"
                    + (
                        ""
                        if force_plan else
                        "If it is a simple question, casual conversation, or answerable in one step "
                        "without tools — respond with exactly: DIRECT\n\n"
                    )
                    + available_tools_block
                    + "Analyse the request and output:\n\n"
                    "TASK: [one clear sentence — what actually needs to be done]\n"
                    "GOAL: [how you will know the task is complete]\n"
                    "UNKNOWNS: [genuine uncertainties that could block execution, or NONE]\n"
                    "RESOURCES:\n"
                    "- [tool_name]: [what it does] — [limitation if any] — [alternative if limitation blocks the goal]\n"
                    "- context sources: [which of memory / NAVI.md / web you will check and why]\n"
                    "SUBTASKS:\n"
                    "1. [discrete unit of work]\n"
                    "2. [discrete unit of work]\n"
                    "ATOMICITY: For each subtask that requires multiple actions — if it fails halfway, "
                    "is any partial result still useful? If not, split it into smaller steps where "
                    "each one delivers an independent, usable result on its own.\n"
                    "REFLECT: yes — if the task is complex (multiple unknowns, external APIs, "
                    "research required, or high-stakes/irreversible actions); "
                    "no — if it is straightforward and the path is clear.\n"
                    "COMMITMENTS: [follow the plan step by step using the todo tool; gather any missing context independently without asking the user]\n\n"
                    "Rules: maximum 6 subtasks. Each must be concrete and actionable. "
                    "No execution yet — analysis only."
                ),
            )
            phase1_ctx: list[Message] = [phase1_system]
            if mem:
                phase1_ctx.append(mem)
            phase1_ctx.extend(m for m in context if m.role != "system")

            try:
                r1 = await asyncio.wait_for(
                    llm.complete(phase1_ctx, tools=None, temperature=0.3, model=profile.model, think=profile.think_enabled),
                    timeout=settings.llm_complete_timeout,
                )
                analysis = (r1.content or "").strip()
            except asyncio.TimeoutError:
                log.warning("agent.planning_phase1_timeout", timeout=settings.llm_complete_timeout)
                _dbg["result"] = "phase1_timeout"
                if not is_subagent:
                    yield PlanningDebugData(log=_dbg)
                return
            except Exception:
                log.warning("agent.planning_phase1_failed", exc_info=True)
                _dbg["result"] = "phase1_error"
                if not is_subagent:
                    yield PlanningDebugData(log=_dbg)
                return

            if r1.prompt_tokens or r1.completion_tokens:
                yield AIHelperTokensUsed(
                    prompt_tokens=r1.prompt_tokens or 0,
                    completion_tokens=r1.completion_tokens or 0,
                )

            _dbg["phases"]["1"] = {
                "output": analysis,
                "prompt_tokens": r1.prompt_tokens or 0,
                "completion_tokens": r1.completion_tokens or 0,
            }

            if not analysis or analysis.upper().startswith("DIRECT"):
                log.debug("agent.planning_skipped", reason="direct")
                _dbg["result"] = "direct"
                if not is_subagent:
                    yield PlanningDebugData(log=_dbg)
                return

            if _stop and _stop.is_set():
                log.debug("agent.planning_stopped", phase=1)
                return
        else:
            log.debug("agent.planning_phase1_skipped")

        # ── Phase 2: Multi-perspective review (conditional) ────────────────────
        # Runs only when planning_phase2_enabled=True AND phase 1 signals
        # that the task is complex enough to warrant independent critique.
        advisor_feedback: str = ""
        needs_reflect = bool(_re.search(r"REFLECT\s*:\s*yes", analysis, _re.IGNORECASE))

        if profile.planning_phase2_enabled and needs_reflect and not is_subagent:
            yield PlanningStatus(phase=2, label="Consulting advisors...", is_subagent=is_subagent)

            _ADVISORS = [
                (
                    "Critic",
                    "You are the Critic advisor. Your role: identify what could go wrong with this plan. "
                    "Look for untested assumptions, overlooked risks, missing error handling, and steps that "
                    "might fail silently. Be specific — name the exact step and the exact risk. "
                    "Do NOT suggest changing the user's goal. Critique the plan, not the objective.",
                ),
                (
                    "Pragmatist",
                    "You are the Pragmatist advisor. Your role: find a simpler, more direct path. "
                    "Identify steps that are unnecessary, can be merged, or parallelised. "
                    "Flag steps where the chosen executor (TOOL/AGENT/SELF) is suboptimal. "
                    "Do NOT suggest changing the user's goal. Improve efficiency, not direction.",
                ),
                (
                    "Detailer",
                    "You are the Detailer advisor. Your role: find what is missing. "
                    "Identify prerequisites not listed, edge cases unaddressed, outputs not specified, "
                    "and validation steps absent. Be concrete — add what is needed, do not restate what is there. "
                    "Do NOT suggest changing the user's goal. Complete the plan, do not redirect it.",
                ),
            ]

            async def _run_advisor(name: str, role_prompt: str) -> tuple[str, str, int, int]:
                adv_system = Message(
                    role="system",
                    content=(
                        _base_sys
                        + "\n\n---\n\n"
                        f"[PLANNING — PHASE 2: ADVISOR — {name.upper()}]\n\n"
                        + role_prompt
                        + "\n\n---\n\n"
                        "The task analysis below was produced in phase 1. "
                        "Review it and provide your concise critique (3–7 bullet points max). "
                        "Speak directly to the plan — no preamble, no conclusion.\n\n"
                        f"PHASE 1 ANALYSIS:\n{analysis}"
                    ),
                )
                # Full chat context so advisors have complete picture of the conversation
                adv_ctx: list[Message] = [adv_system]
                if mem:
                    adv_ctx.append(mem)
                adv_ctx.extend(m for m in context if m.role != "system")
                try:
                    r = await asyncio.wait_for(
                        llm.complete(adv_ctx, tools=None, temperature=0.4, model=profile.model, think=False),
                        timeout=settings.llm_complete_timeout,
                    )
                    return name, (r.content or "").strip(), r.prompt_tokens or 0, r.completion_tokens or 0
                except Exception:
                    log.warning("agent.planning_advisor_failed", advisor=name, exc_info=True)
                    return name, "", 0, 0

            try:
                advisor_results = await asyncio.gather(*[
                    _run_advisor(name, prompt) for name, prompt in _ADVISORS
                ])
            except Exception:
                log.warning("agent.planning_reflect_failed", exc_info=True)
                advisor_results = []

            _dbg["phases"]["2"] = {}
            feedback_parts: list[str] = []
            for name, output, pt, ct in advisor_results:
                if output:
                    feedback_parts.append(f"### {name}\n{output}")
                    yield AIHelperTokensUsed(prompt_tokens=pt, completion_tokens=ct)
                _dbg["phases"]["2"][name] = {"output": output, "prompt_tokens": pt, "completion_tokens": ct}

            if feedback_parts:
                advisor_feedback = "\n\n".join(feedback_parts)
                log.debug("agent.planning_reflect_done", advisors=len(feedback_parts))

            if _stop and _stop.is_set():
                log.debug("agent.planning_stopped", phase=2)
                return

        # ── Phase 3: Execution plan ────────────────────────────────────────────
        if not profile.planning_phase3_enabled:
            log.debug("agent.planning_phase3_skipped")
            _dbg["result"] = "phase1_only"
            if not is_subagent:
                yield PlanningDebugData(log=_dbg)
            return

        yield PlanningStatus(phase=3, label="Building execution plan...", is_subagent=is_subagent)

        advisor_block = (
            "Advisor feedback from multi-perspective review — address these points in your plan:\n\n"
            + advisor_feedback
            + "\n\n---\n\n"
        ) if advisor_feedback else ""

        phase3_system = Message(
            role="system",
            content=(
                _base_sys
                + "\n\n---\n\n"
                "[PLANNING — PHASE 3: EXECUTION PLAN]\n\n"
                "Task analysis:\n\n"
                f"{analysis}\n\n"
                "---\n\n"
                + advisor_block
                + available_tools_block
                + "Now write the execution plan. For each subtask assign a specific executor:\n"
                "- TOOL: <tool_name>  — a single tool call is enough; use exact tool names from the list above\n"
                "- AGENT: <profile_id>  — needs 2+ tool calls; one subagent handles this ONE step\n"
                "- SELF  — final synthesis or a context-dependent single action only\n\n"
                "AGENT scoping rules (critical):\n"
                "- Each AGENT step is one focused, independently verifiable unit of work.\n"
                "- One AGENT step = one spawn_agent call later. Do NOT bundle multiple concerns.\n"
                "- Comma test: if your step description lists things with 'and' or commas, "
                "each item is a separate step.\n"
                "- Good: 'Research X pricing from 3 sources' | 'Audit SSH config on host Y'\n"
                "- Bad: 'Research everything and write the full report' (too broad — split it)\n\n"
                "Required output format (use exactly this structure):\n\n"
                "## Plan\n\n"
                "**Task:** [reformulated task]\n"
                "**Goal:** [success criterion]\n\n"
                "**Steps:**\n"
                "1. [description] → TOOL: tool_name\n"
                "2. [description] → AGENT: profile_id\n"
                "3. [description] → AGENT: profile_id\n"
                "4. [description] → SELF\n\n"
                "**Parallel:** [step numbers that can run simultaneously, or NONE]\n"
                "**Risks:** [unknowns to watch for, or NONE]\n\n"
                "Do not write prose. Do not start executing. Plan only."
            ),
        )
        # Phase 3 only needs the analysis (embedded above) and the original request.
        # Full history is intentionally excluded to keep the focus on plan structure.
        phase3_ctx: list[Message] = [phase3_system]
        if mem:
            phase3_ctx.append(mem)
        user_msgs = [m for m in context if m.role == "user"]
        if user_msgs:
            phase3_ctx.append(user_msgs[-1])

        try:
            r2 = await asyncio.wait_for(
                llm.complete(phase3_ctx, tools=None, temperature=0.3, model=profile.model, think=False),
                timeout=settings.llm_complete_timeout,
            )
            plan_text = (r2.content or "").strip()
        except asyncio.TimeoutError:
            log.warning("agent.planning_phase3_timeout", timeout=settings.llm_complete_timeout)
            _dbg["result"] = "phase3_timeout"
            if not is_subagent:
                yield PlanningDebugData(log=_dbg)
            return
        except Exception:
            log.warning("agent.planning_phase3_failed", exc_info=True)
            _dbg["result"] = "phase3_error"
            if not is_subagent:
                yield PlanningDebugData(log=_dbg)
            return

        if r2.prompt_tokens or r2.completion_tokens:
            yield AIHelperTokensUsed(
                prompt_tokens=r2.prompt_tokens or 0,
                completion_tokens=r2.completion_tokens or 0,
            )

        _dbg["phases"]["3"] = {
            "output": plan_text,
            "prompt_tokens": r2.prompt_tokens or 0,
            "completion_tokens": r2.completion_tokens or 0,
        }

        if not plan_text:
            _dbg["result"] = "empty_plan"
            if not is_subagent:
                yield PlanningDebugData(log=_dbg)
            return

        # Warn if no numbered steps but still use the plan
        if not _re.search(r"^\s*\d+[\.\)]", plan_text, _re.MULTILINE):
            log.warning("agent.planning_no_numbered_steps", plan_preview=plan_text[:200])

        if _stop and _stop.is_set():
            log.debug("agent.planning_stopped", phase=3)
            return

        # Warn if executor assignments are still missing (plan may be malformed)
        if not _re.search(r"(TOOL:|AGENT:|→\s*SELF)", plan_text):
            log.warning("agent.planning_no_executors", hint="plan lacks TOOL/AGENT/SELF assignments")

        # Inject plan into context so the main loop continues from it,
        # and into messages (with is_plan flag) so the UI can render a plan card after reload.
        context.append(Message(role="assistant", content=plan_text))
        if messages is not None:
            messages.append(Message(role="assistant", content=plan_text, is_plan=True))

        # Prompt execution: without this the model treats the plan as a completed response.
        context.append(Message(
            role="system",
            content="Plan is ready. Execute it now step by step, starting with step 1. Use the todo tool to track progress.",
        ))

        # Auto-populate todo from plan steps — model only needs to call 'update' after each step.
        _todo_steps = _parse_plan_steps(plan_text)
        if _todo_steps:
            try:
                from navi.tools.todo import _plans, _Task
                from navi.tools.base import current_session_id as _sid_var
                _sid = _sid_var.get() or "__default__"
                _plans[_sid] = [_Task(text=s) for s in _todo_steps]
                log.debug("agent.todo_auto_populated", steps=len(_todo_steps), session=_sid)
            except Exception:
                log.warning("agent.todo_auto_populate_failed", exc_info=True)

        log.debug("agent.plan_ready", phases=3 if advisor_feedback else 2, length=len(plan_text))
        if not is_subagent:
            yield PlanningDebugData(log=_dbg)
        yield PlanReady(plan=plan_text, is_subagent=is_subagent)

    async def _run_workers(
        self,
        session,
        llm: LLMBackend,
        model: str,
        context_tokens: int | None,
    ) -> list[AgentEvent]:
        """Run all workers sequentially; collect their events."""
        from navi.workers.base import WorkerContext

        ctx = WorkerContext(
            session_id=session.id,
            context_tokens=context_tokens,
            max_context_tokens=settings.ollama_num_ctx,
            llm=llm,
            model=model,
            temperature=settings.context_summary_temperature,
            session_store=self._sessions,
        )
        events: list[AgentEvent] = []
        for worker in self._workers:
            try:
                result = await worker.run(session, ctx)
                events.extend(result.events)
            except Exception:
                log.warning("agent.worker_failed",
                            worker=type(worker).__name__, exc_info=True)
        return events

    async def _memory_msg(self) -> "Message | None":
        """Return an ephemeral system message with the user memory summary, or None."""
        if not self._memory:
            return None
        summary = await self._memory.get_summary()
        if not summary:
            return None
        return Message(role="system", content=f"## What I remember about the user\n\n{summary}")

    async def _collect_context_injections(self, profile: "AgentProfile") -> list[Message]:
        """Run context providers for this profile and return system messages to inject."""
        if not self._cp_registry:
            return []
        providers = self._cp_registry.get_globals()
        for p in self._cp_registry.get_named(profile.context_providers):
            if p not in providers:
                providers.append(p)
        msgs: list[Message] = []
        for provider in providers:
            try:
                text = await provider.get_context()
                if text:
                    msgs.append(Message(role="system", content=text))
            except Exception as exc:
                log.warning("context_provider.error", name=getattr(provider, "name", "?"), error=str(exc))
        return msgs

    def _build_context(
        self,
        session_context: list[Message],
        profile: "AgentProfile",
        mem: "Message | None",
        iteration: int | None = None,
        max_iterations: int | None = None,
        extra_system: list[Message] | None = None,
    ) -> list[Message]:
        """Build the full LLM context for one call.

        System prompt is injected fresh from the current profile every time —
        it is NOT stored in session.context so that profile switches take
        effect immediately without touching stored history.
        Memory (if any) is placed right after the system message.
        Any system messages already in session.context are stripped (migration safety).
        """
        system_msg = Message(
            role="system",
            content=self._build_system_prompt(profile),
        )
        conv = [m for m in session_context if m.role != "system"]
        result: list[Message] = [system_msg]
        if mem:
            result.append(mem)
        if extra_system:
            result.extend(extra_system)
        result.extend(conv)

        if profile.iteration_budget_enabled and iteration is not None and max_iterations is not None:
            remaining = max_iterations - iteration
            if remaining <= 3:
                urgency = f" CRITICAL: only {remaining} iteration(s) left — finish or produce a partial result now, do not start new subtasks."
            elif remaining <= 7:
                urgency = " Start wrapping up: prioritize completing current work over starting new subtasks."
            else:
                urgency = ""
            result.append(Message(
                role="system",
                content=f"[Iteration {iteration + 1}/{max_iterations} — {remaining} remaining.{urgency}]",
            ))

        return result

    def _build_system_prompt(self, profile: "AgentProfile") -> str:
        parts: list[str] = []

        persona = settings.navi_persona.strip()
        if persona:
            parts.append(persona)

        parts.append(profile.system_prompt)

        # Compact profiles block — every agent knows what other profiles exist
        # and when to switch. Injected dynamically so new profiles appear automatically.
        other = [p for p in self._profiles.all() if p.id != profile.id]
        if other:
            lines = [
                "## Available profiles",
                f"Current: **{profile.id}**",
            ]
            for p in other:
                desc = p.short_description or p.description
                lines.append(f"· {p.id}: {desc}")
            lines.append("→ Switch profiles on your own judgment — do not ask for permission. When a task clearly fits another profile, call switch_profile immediately, then inform the user which profile is now active and why. Use list_profiles if you need details about a profile's capabilities.")
            parts.append("\n".join(lines))

        return "\n\n---\n\n".join(parts)

    def _build_goal_anchor(self, session_id: str, user_message: str) -> Message:
        """Build a goal-anchor system message injected every N iterations.

        Reminds the model of the original user request and the current todo
        state so it doesn't drift on long multi-step tasks.
        """
        lines = [
            "[Goal anchor]",
            f"Original request: {user_message}",
        ]

        try:
            from navi.tools.todo import _plans
            tasks = _plans.get(session_id, [])
            if tasks:
                lines.append("Current todo:")
                for i, t in enumerate(tasks):
                    from navi.tools.todo import _STATUS_ICON
                    icon = _STATUS_ICON.get(t.status, "?")
                    lines.append(f"  {icon} [{i}] {t.text} ({t.status})")
        except Exception:
            pass

        lines.append("Stay on track — complete the remaining pending/in_progress steps.")
        return Message(role="system", content="\n".join(lines))

    def _tool_list(self, enabled: list[str]) -> list[Tool]:
        names = list(enabled)
        extra = _load_user_enabled_tools()
        for name in extra:
            if name not in names:
                names.append(name)
        result = []
        for name in names:
            try:
                result.append(self._tools.get(name))
            except Exception:
                pass
        return result

    def _get_backend(self, backend_key: str) -> LLMBackend:
        return self._backends.get(backend_key)

    def _check_context_size(self, context: list[Message]) -> None:
        """Raise ContextTooLargeError before an LLM call if the context is dangerously large.

        Uses a conservative character-based estimate (~4 chars per token for text).
        Images are counted at 500 tokens each (rough vision-model estimate).

        Checks against the *remaining* budget, not a fixed percentage of the window:
            available_for_input = ollama_num_ctx - output_reserve
        where output_reserve is a fixed token headroom reserved for the model's response.
        This correctly accounts for sessions where conversation history already consumes
        a large portion of the window.
        """
        if not context:
            return

        output_reserve = 2048  # tokens reserved for the model's own response

        def _estimate(msgs: list[Message]) -> int:
            chars = sum(len(m.content or "") for m in msgs)
            imgs = sum(500 for m in msgs if m.images)
            return chars // 4 + imgs

        total = _estimate(context)
        available = settings.ollama_num_ctx - output_reserve

        if total > available:
            existing = _estimate(context[:-1])
            new = _estimate(context[-1:])
            remaining = available - existing
            raise ContextTooLargeError(
                f"Context too large: new content is ~{new:,} estimated tokens, "
                f"but only ~{max(0, remaining):,} tokens are available "
                f"(window {settings.ollama_num_ctx:,}, already used ~{existing:,}, "
                f"output_reserve {output_reserve:,}). "
                "Split the file into smaller parts or delegate to a subagent."
            )

    async def _run_single_tool(
        self,
        tc: ToolCallRequest,
        tool_map: dict[str, Tool],
    ) -> tuple[ToolEvent, Message, "Message | None"]:
        """Execute one tool call and return (ToolEvent, tool_msg, optional_image_msg).

        Called via asyncio.create_task() from run_stream() so that the parent
        generator can drain the event sink queue concurrently.
        """
        tool = tool_map.get(tc.name)
        image_msg = None
        if tool is None:
            content = f"Error: tool '{tc.name}' not found."
            event = ToolEvent(tool_name=tc.name, arguments=tc.arguments,
                              result=content, success=False)
        else:
            log.info("tool.execute", tool=tc.name, args=tc.arguments)
            result = await tool.execute(tc.arguments)
            content = result.to_message_content()
            event = ToolEvent(tool_name=tc.name, arguments=tc.arguments,
                              result=content, success=result.success)
            if result.success and result.metadata and result.metadata.get("is_image"):
                b64 = result.metadata.get("base64")
                if b64:
                    image_msg = Message(
                        role="user",
                        content=f"[Image loaded via {tc.name} — analyse it]",
                        images=[b64],
                    )
        msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name)
        return event, msg, image_msg

    async def _execute_tool_calls(
        self, tool_calls: list[ToolCallRequest], tools: list[Tool]
    ) -> tuple[list[Message], list[Message]]:
        tool_map = {t.name: t for t in tools}

        async def _run_one(tc: ToolCallRequest) -> tuple[Message, Message | None]:
            tool = tool_map.get(tc.name)
            image_msg = None
            if tool is None:
                content = f"Error: tool '{tc.name}' not found."
            else:
                log.info("tool.execute", tool=tc.name, args=tc.arguments)
                result = await tool.execute(tc.arguments)
                content = result.to_message_content()
                if result.success and result.metadata and result.metadata.get("is_image"):
                    b64 = result.metadata.get("base64")
                    if b64:
                        image_msg = Message(
                            role="user",
                            content=f"[Image loaded via {tc.name} — analyse it]",
                            images=[b64],
                        )
            tool_msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name)
            return tool_msg, image_msg

        pairs = await asyncio.gather(*[_run_one(tc) for tc in tool_calls])
        tool_msgs = [p[0] for p in pairs]
        image_msgs = [p[1] for p in pairs if p[1] is not None]
        return tool_msgs, image_msgs

    async def _execute_tool_calls_streaming(
        self, tool_calls: list[ToolCallRequest], tools: list[Tool]
    ) -> tuple[list[tuple[ToolEvent, Message]], list[Message]]:
        tool_map = {t.name: t for t in tools}

        async def _run_one(tc: ToolCallRequest) -> tuple[ToolEvent, Message, Message | None]:
            tool = tool_map.get(tc.name)
            image_msg = None
            if tool is None:
                content = f"Error: tool '{tc.name}' not found."
                event = ToolEvent(
                    tool_name=tc.name, arguments=tc.arguments, result=content, success=False
                )
            else:
                log.info("tool.execute", tool=tc.name, args=tc.arguments)
                result = await tool.execute(tc.arguments)
                content = result.to_message_content()
                event = ToolEvent(
                    tool_name=tc.name,
                    arguments=tc.arguments,
                    result=content,
                    success=result.success,
                )
                if result.success and result.metadata and result.metadata.get("is_image"):
                    b64 = result.metadata.get("base64")
                    if b64:
                        image_msg = Message(
                            role="user",
                            content=f"[Image loaded via {tc.name} — analyse it]",
                            images=[b64],
                        )
            msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name)
            return event, msg, image_msg

        triples = await asyncio.gather(*[_run_one(tc) for tc in tool_calls])
        pairs = [(t[0], t[1]) for t in triples]
        image_msgs = [t[2] for t in triples if t[2] is not None]
        return pairs, image_msgs