Newer
Older
navi-1 / navi / core / agent.py
"""
Agent: the tool-calling loop.

Flow:
1. Receive user message, load session + profile
2. Build tool schemas from profile's enabled_tools
3. Loop (up to max_iterations):
   a. Call LLM with session.context (may be compressed) + tool schemas
   b. If finish_reason == "stop"  -> stream final response
   c. If finish_reason == "tool_calls" -> execute tools, append to both
      session.messages (display history) and session.context (LLM context)
4. After StreamEnd: run workers sequentially (e.g. context compression)

session.messages — full display history, never compressed
session.context  — what the LLM sees; workers may compress this
"""

import asyncio
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator

import structlog

from navi.config import settings
from navi.exceptions import ContextTooLargeError, LLMBackendError, LLMConnectionError, MaxIterationsReached, SessionNotFound
from navi.llm.base import LLMBackend, LLMChunk, Message, ToolCallRequest
from navi.tools.base import Tool, current_event_sink, current_stop_event

from .compressor import compress_context, should_compress
from .context_builder import ContextBuilder
from .planning import PlanningEngine
from .events import (
    AgentEvent,
    AIHelperTokensUsed,
    ContextCompressed,
    PlanningDebugData,
    PlanningStatus,
    PlanReady,
    StreamEnd,
    StreamStopped,
    SubagentComplete,
    TextDelta,
    ThinkingDelta,
    ThinkingEnd,
    ToolEvent,
    ToolStarted,
    TurnThinking,
)
from .registry import BackendRegistry, ProfileRegistry, ToolRegistry
from .session import SessionStore
from .tool_executor import ToolExecutor

if TYPE_CHECKING:
    from navi.context_providers._loader import ContextProviderRegistry
    from navi.memory.store import MemoryStore
    from navi.workers.base import Worker, WorkerContext

_USER_ENABLED_FILE = Path(settings.tools_dir) / "enabled.json"


async def _iter_stream_guarded(
    stream_gen: "AsyncGenerator[LLMChunk, None]",
    stop_event: "asyncio.Event | None",
    first_chunk_timeout: float,
    chunk_timeout: float,
) -> "AsyncGenerator[LLMChunk, None]":
    """
    Wraps a streaming LLM generator with two safety mechanisms:

    1. Stop-event responsiveness during prefill.
       Normally, the agent only checks stop_event *between* chunks. During the
       prefill phase (processing input tokens) Ollama emits no chunks at all —
       the first await can block for minutes on large contexts. This wrapper polls
       stop_event every second so the user's Stop button works even then.

    2. Timeouts as a last-resort safety net.
       first_chunk_timeout: how long to wait for the first token (prefill).
       chunk_timeout: max gap between subsequent tokens.
       On timeout the generator is closed, which terminates the HTTP connection
       to Ollama → Ollama halts generation → GPU load drops to idle.
    """
    first = True
    chunk_task: asyncio.Task | None = None
    try:
        while True:
            timeout = first_chunk_timeout if first else chunk_timeout
            # Create one task per chunk; reuse across poll iterations so we
            # don't accidentally start multiple concurrent __anext__ calls.
            chunk_task = asyncio.ensure_future(stream_gen.__anext__())
            elapsed = 0.0

            while True:
                done, _ = await asyncio.wait({chunk_task}, timeout=1.0)
                if done:
                    break
                elapsed += 1.0
                if stop_event and stop_event.is_set():
                    chunk_task.cancel()
                    try:
                        await chunk_task
                    except (asyncio.CancelledError, Exception):
                        pass
                    chunk_task = None
                    return
                if elapsed >= timeout:
                    chunk_task.cancel()
                    try:
                        await chunk_task
                    except (asyncio.CancelledError, Exception):
                        pass
                    chunk_task = None
                    label = "first token (context may be too large for this model)" if first else "next token"
                    raise LLMBackendError(
                        f"LLM timed out after {elapsed:.0f}s waiting for {label}."
                    )

            try:
                chunk = chunk_task.result()
            except StopAsyncIteration:
                chunk_task = None
                return

            chunk_task = None
            first = False
            yield chunk

            if stop_event and stop_event.is_set():
                return

    finally:
        # Cancel any in-flight __anext__ task so we don't leave a zombie
        # coroutine holding an open HTTP connection to Ollama.
        if chunk_task is not None and not chunk_task.done():
            chunk_task.cancel()
            try:
                await chunk_task
            except (asyncio.CancelledError, Exception):
                pass
        # Closing the generator terminates the HTTP connection to Ollama,
        # which signals it to stop generating (GPU returns to idle).
        try:
            await stream_gen.aclose()
        except Exception:
            pass


def _load_user_enabled_tools() -> list[str]:
    try:
        return json.loads(_USER_ENABLED_FILE.read_text())
    except Exception:
        return []


log = structlog.get_logger()

# Sentinel: placed in the event sink by the tool wrapper to signal completion.
_TOOL_DONE = object()

# Sub-agents are execution workers. If a sub-agent produces only thinking for a
# long time without text or tool calls, local models can degenerate into endless
# internal-token loops and keep the GPU busy with no user-visible progress.
_SUBAGENT_THINKING_STALL_SECONDS = 60.0
_SUBAGENT_THINKING_STALL_CHARS = 12_000


def _todo_status_snapshot(session_id: str) -> frozenset[tuple[str, str]]:
    """Return a frozenset of (task_text, status) for the current session's todo list.

    Used by the anti-stall detector to compare todo state before and after an
    iteration — any status change means the model made real progress.
    """
    from navi.tools.todo import get_task_snapshot
    return get_task_snapshot(session_id)


def _todo_failed_steps(session_id: str) -> frozenset[tuple[int, str]]:
    """Return a frozenset of (1-based index, task_text) for steps currently marked failed."""
    from navi.tools.todo import get_failed_steps
    return get_failed_steps(session_id)


def _todo_progress_message(session_id: str, *, first_iteration: bool = False) -> "Message | None":
    """Build a compact system reminder with current todo state and update discipline."""
    from navi.tools.todo import get_progress_message
    return get_progress_message(session_id, first_iteration=first_iteration)


class Agent:
    def __init__(
        self,
        session_store: "SessionStore | None",
        profile_registry: ProfileRegistry,
        tool_registry: ToolRegistry,
        backend_registry: BackendRegistry,
        workers: list["Worker"] | None = None,
        memory_store: "MemoryStore | None" = None,
        cp_registry: "ContextProviderRegistry | None" = None,
        mcp_manager=None,
    ) -> None:
        self._sessions = session_store
        self._profiles = profile_registry
        self._tools = tool_registry
        self._backends = backend_registry
        self._workers: list["Worker"] = workers or []
        self._memory = memory_store
        self._cp_registry = cp_registry
        self._mcp_manager = mcp_manager
        self._ctx_builder = ContextBuilder(
            profile_registry=profile_registry,
            memory_store=memory_store,
            cp_registry=cp_registry,
            mcp_manager=mcp_manager,
        )
        self._tool_executor = ToolExecutor(tool_registry)
        self._planning = PlanningEngine(self._ctx_builder)

    # ------------------------------------------------------------------
    # Public interface
    # ------------------------------------------------------------------

    async def run(self, session_id: str, user_message: str, images: list[str] | None = None) -> str:
        """Non-streaming: run the full tool-calling loop and return the final text."""
        session = await self._sessions.get(session_id)
        if session is None:
            raise SessionNotFound(session_id)

        profile = self._profiles.get(session.profile_id)
        tools = self._tool_list(profile.enabled_tools, profile.mcp_servers)
        tool_schemas = [t.schema() for t in tools]
        llm = self._get_backend(profile.llm_backend)

        mem = await self._ctx_builder._memory_msg(user_id=session.user_id)

        # Expose session_id to tools (e.g. SSH connection pool) via ContextVar
        from navi.tools.base import current_session_id as _sid_var
        _sid_var.set(session_id)

        # current_user_id and current_user_role are set by the caller
        # (websocket_session or messages endpoint) before run()/run_stream()

        user_msg = Message(role="user", content=user_message, images=images or None,
                           created_at=datetime.now(timezone.utc))
        session.messages.append(user_msg)
        session.context.append(user_msg)
        await self._sessions.save(session)

        ctx_injections = await self._ctx_builder._collect_context_injections(profile)
        for iteration in range(profile.max_iterations):
            log.debug("agent.iteration", session_id=session_id, iteration=iteration)
            response = await llm.complete(
                self._ctx_builder.build(session.context, profile, mem,
                                        iteration=iteration, max_iterations=profile.max_iterations,
                                        extra_system=ctx_injections,
                                        session_id=session_id),
                tools=tool_schemas if tools else None,
                temperature=profile.temperature,
                model=profile.model,
                top_k=profile.top_k,
                top_p=profile.top_p,
                num_thread=profile.num_thread,
            )

            if response.finish_reason == "stop" or not response.tool_calls:
                content = response.content or ""
                assistant_msg = Message(role="assistant", content=content,
                                        created_at=datetime.now(timezone.utc))
                session.messages.append(assistant_msg)
                session.context.append(assistant_msg)
                await self._sessions.save(session)
                return content

            # Tool calls turn — append to both messages and context
            assistant_msg = Message(
                role="assistant",
                content=response.content,
                tool_calls=response.tool_calls,
            )
            session.messages.append(assistant_msg)
            session.context.append(assistant_msg)

            tool_results, image_injections = await self._tool_executor._execute_tool_calls(response.tool_calls, tools)
            session.messages.extend(tool_results)
            session.context.extend(tool_results)
            # Image injections are synthetic LLM helpers — context only
            session.context.extend(image_injections)

        await self._sessions.save(session)
        raise MaxIterationsReached(profile.max_iterations)

    async def run_ephemeral(
        self,
        user_message: str,
        profile_id: str,
        max_iterations: int = 40,
        exclude_tools: list[str] | None = None,
        briefing: str | None = None,
        custom_system_prompt: str | None = None,
        context_transfer: str | None = None,
        parent_session_id: str | None = None,
        timeout_seconds: float = 300.0,
    ) -> tuple[str, bool]:
        """
        Run a sub-agent loop without a persistent session.

        Returns (result_text, completed_normally).
        completed_normally is False if the sub-agent hit the iteration limit or timed out.

        Intended for spawning from tools (e.g. SpawnAgentTool).
        No DB reads/writes — uses a temporary in-memory context.
        Tools listed in exclude_tools are stripped from the tool list
        (use this to prevent recursion: exclude 'spawn_agent').

        System prompt structure (completely separate from the parent's system prompt):
          1. profile.subagent_system_prompt  — focused executor persona
          2. custom_system_prompt            — optional role specialisation for this task
          3. briefing                        — task context (credentials, paths, instructions)

        context_transfer: text from the parent's scratchpad context_transfer section,
            injected as a priming exchange before the task message.
        parent_session_id: parent chat session id. When provided, sub-agent
            tool calls run in that session context so session-aware tools resolve
            filenames against the user's session directory, not an ephemeral id.
        timeout_seconds: wall-clock timeout for the entire sub-agent run.
        """
        import time as _time
        import uuid as _uuid
        from navi.tools.base import (
            current_session_id as _sid_var,
            current_model as _model_var,
            current_user_id as _uid_var,
            current_user_role as _role_var,
            current_user_info as _uinfo_var,
        )
        _prev_sid = _sid_var.get(None)
        _prev_model = _model_var.get(None)
        _prev_uid = _uid_var.get(None)
        _prev_role = _role_var.get()
        _prev_uinfo = _uinfo_var.get(None)
        subagent_run_id = f"subagent_{_uuid.uuid4().hex[:12]}"
        tool_session_id = parent_session_id or subagent_run_id
        _sid_var.set(tool_session_id)

        profile = self._profiles.get(profile_id)
        _model_var.set(profile.model)
        exclude = set(exclude_tools or [])

        # Use dedicated subagent_tools if configured, else fall back to enabled_tools.
        tool_source = profile.subagent_tools if profile.subagent_tools else profile.enabled_tools
        tools = [t for t in self._tool_list(tool_source, profile.mcp_servers) if t.name not in exclude]
        tool_schemas = [t.schema() for t in tools]
        llm = self._get_backend(profile.llm_backend)

        user_id = None
        if parent_session_id and self._sessions:
            parent_session = await self._sessions.get(parent_session_id)
            if parent_session:
                user_id = parent_session.user_id
        if user_id is not None:
            _uid_var.set(user_id)
            _role_var.set(_prev_role or "user")
            _uinfo_var.set(_prev_uinfo)
        else:
            _uid_var.set(None)
            _role_var.set("user")
            _uinfo_var.set(None)
        mem = await self._ctx_builder._memory_msg(user_id=user_id)

        # Build subagent system prompt — completely separate from the parent's system prompt.
        # No persona, no orchestrator instructions, no profiles block.
        # Structure: executor persona → role specialisation → task context (briefing)
        sys_parts: list[str] = []
        if profile.subagent_system_prompt:
            sys_parts.append(profile.subagent_system_prompt)
        if custom_system_prompt:
            sys_parts.append(custom_system_prompt)
        if briefing:
            sys_parts.append(f"## Task context\n\n{briefing}")
        if parent_session_id:
            sys_parts.append(
                "[Parent session context]\n"
                f"Parent Session ID: {parent_session_id}\n"
                f"Session files directory: {settings.session_files_dir}/{parent_session_id}/\n"
                "For files the user should see, write to this exact session directory. "
                "Do not use or invent a subagent_* directory."
            )
        if not sys_parts:
            # Fallback if profile has no subagent_system_prompt defined
            sys_parts.append(profile.system_prompt)
        subagent_sys_msg = Message(role="system", content="\n\n---\n\n".join(sys_parts))

        # Build initial context.
        # If context_transfer is provided, inject it as a priming exchange so the
        # sub-agent has the parent's working state from the start.
        context: list[Message] = []
        if context_transfer:
            context.append(Message(
                role="user",
                content=f"## Context from parent agent\n\n{context_transfer}",
            ))
            context.append(Message(
                role="assistant",
                content="Understood. I have the context. Ready to begin the task.",
            ))
        context.append(Message(role="user", content=user_message, created_at=datetime.now(timezone.utc)))

        # Read the event sink set by the parent run_stream() for this tool call.
        # If None (e.g. called from run(), not run_stream()), events are silently dropped.
        sink = current_event_sink.get()

        log.info("agent.subagent.start", profile_id=profile_id, max_iterations=max_iterations,
                 tools=len(tools), planning=profile.subagent_planning_enabled)

        stop_event = current_stop_event.get()
        tool_map = {t.name: t for t in tools}
        subagent_think = (
            profile.subagent_think_enabled
            if profile.subagent_think_enabled is not None
            else profile.think_enabled
        )

        _sub_tokens: int = 0        # tokens from the final LLM call
        _sub_tool_count: int = 0    # total tool calls across all iterations
        _start_time = _time.monotonic()
        accumulated_text = ""

        try:
            # ── Optional planning phase ────────────────────────────────────────────
            if profile.subagent_planning_enabled:
                async for _ev in self._planning.run(
                    context, profile, llm, mem, tool_schemas,
                    system_prompt_override=subagent_sys_msg.content,
                    is_subagent=True,
                ):
                    if isinstance(_ev, AIHelperTokensUsed):
                        pass  # token accounting only, not forwarded
                    elif sink is not None:
                        await sink.put(_ev)

            # ── Tool-calling loop ──────────────────────────────────────────────────
            for iteration in range(max_iterations):
                if stop_event and stop_event.is_set():
                    return accumulated_text, False

                elapsed = _time.monotonic() - _start_time
                if elapsed >= timeout_seconds:
                    log.warning("agent.subagent.timeout", elapsed=elapsed, timeout=timeout_seconds)
                    if sink is not None:
                        await sink.put(SubagentComplete(token_count=_sub_tokens, tool_call_count=_sub_tool_count))
                    return accumulated_text or "[Sub-agent timed out]", False

                log.debug("agent.subagent.iteration", iteration=iteration)

                accumulated_text = ""
                accumulated_thinking = ""
                turn_tool_calls: list[ToolCallRequest] | None = None
                turn_tokens: int | None = None
                thinking_started_at: float | None = None
                thinking_stalled_reason: str | None = None

                # Build context inline — no persona or profiles block for subagents.
                built_ctx: list[Message] = [subagent_sys_msg]
                if mem:
                    built_ctx.append(mem)
                mcp_msg = self._ctx_builder._mcp_context_msg()
                if mcp_msg:
                    built_ctx.append(mcp_msg)
                built_ctx.extend(m for m in context if m.role != "system")
                self._check_context_size(built_ctx)

                async for chunk in _iter_stream_guarded(
                    llm.stream_complete(
                        built_ctx,
                        tools=tool_schemas if tools else None,
                        temperature=profile.temperature,
                        model=profile.model,
                        think=subagent_think,
                        top_k=profile.top_k,
                        top_p=profile.top_p,
                        num_thread=profile.num_thread,
                    ),
                    stop_event=stop_event,
                    first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
                    chunk_timeout=settings.llm_stream_chunk_timeout,
                ):
                    if chunk.prompt_tokens is not None or chunk.completion_tokens is not None:
                        turn_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0)
                    if chunk.thinking:
                        if thinking_started_at is None:
                            thinking_started_at = _time.monotonic()
                        accumulated_thinking += chunk.thinking
                        thinking_elapsed = _time.monotonic() - thinking_started_at
                        if (
                            thinking_elapsed >= _SUBAGENT_THINKING_STALL_SECONDS
                            or len(accumulated_thinking) >= _SUBAGENT_THINKING_STALL_CHARS
                        ):
                            thinking_stalled_reason = (
                                "Sub-agent produced only thinking output for "
                                f"{thinking_elapsed:.0f}s / {len(accumulated_thinking)} chars "
                                "without text or tool calls."
                            )
                            log.warning(
                                "agent.subagent.thinking_stall",
                                elapsed=thinking_elapsed,
                                chars=len(accumulated_thinking),
                                profile_id=profile_id,
                            )
                            break
                    if chunk.delta:
                        accumulated_text += chunk.delta
                    if chunk.tool_calls:
                        turn_tool_calls = chunk.tool_calls

                if stop_event and stop_event.is_set():
                    return accumulated_text, False

                if thinking_stalled_reason:
                    if sink is not None:
                        await sink.put(SubagentComplete(
                            token_count=turn_tokens or 0,
                            tool_call_count=_sub_tool_count,
                        ))
                    return f"[{thinking_stalled_reason}]", False

                if not turn_tool_calls:
                    log.info("agent.subagent.complete", iterations=iteration + 1,
                             result_len=len(accumulated_text))
                    _sub_tokens = turn_tokens or 0
                    if sink is not None:
                        await sink.put(SubagentComplete(
                            token_count=_sub_tokens,
                            tool_call_count=_sub_tool_count,
                        ))
                    return accumulated_text, True

                # Emit accumulated thinking before tool calls
                if accumulated_thinking and sink is not None:
                    log.debug("agent.subagent.turn_thinking", length=len(accumulated_thinking))
                    await sink.put(TurnThinking(thinking=accumulated_thinking, is_subagent=True))

                context.append(Message(
                    role="assistant",
                    content=accumulated_text or None,
                    tool_calls=turn_tool_calls,
                ))

                # Execute each tool call sequentially, emitting events to parent sink
                for tc in turn_tool_calls:
                    _sub_tool_count += 1
                    if sink is not None:
                        await sink.put(ToolStarted(
                            tool_name=tc.name, arguments=tc.arguments, is_subagent=True
                        ))

                    tool = tool_map.get(tc.name)
                    image_msg = None
                    metadata: dict = {}
                    if tool is None:
                        content = f"Error: tool '{tc.name}' not found."
                        success = False
                    else:
                        log.info("tool.execute.subagent", tool=tc.name, args=tc.arguments)
                        try:
                            result = await tool.execute(tc.arguments)
                            content = result.to_message_content()
                            success = result.success
                            metadata = result.metadata or {}
                            if result.success and result.metadata and result.metadata.get("is_image"):
                                b64 = result.metadata.get("base64")
                                if b64:
                                    image_msg = Message(
                                        role="user",
                                        content=f"[Image loaded via {tc.name} — analyse it]",
                                        images=[b64],
                                    )
                        except Exception as exc:
                            log.warning("agent.subagent.tool_exception", tool=tc.name, error=str(exc))
                            content = f"Error: {exc}"
                            success = False
                            metadata = {}

                    if sink is not None:
                        await sink.put(ToolEvent(
                            tool_name=tc.name, arguments=tc.arguments,
                            result=content, success=success, is_subagent=True,
                        ))

                    context.append(Message(role="tool", content=content,
                                           tool_call_id=tc.id, name=tc.name, metadata=metadata))
                    if image_msg:
                        context.append(image_msg)

            log.warning("agent.subagent.max_iterations", max_iterations=max_iterations)
            if sink is not None:
                await sink.put(SubagentComplete(token_count=_sub_tokens, tool_call_count=_sub_tool_count))
            return accumulated_text or "[Sub-agent reached iteration limit without a final answer]", False
        finally:
            # Restore parent ContextVar values so background tasks don't inherit stale subagent IDs
            _sid_var.set(_prev_sid)
            _model_var.set(_prev_model)
            _uid_var.set(_prev_uid)
            _role_var.set(_prev_role)
            _uinfo_var.set(_prev_uinfo)

    async def run_stream(
        self,
        session_id: str,
        user_message: str,
        images: list[str] | None = None,
        display_message: str | None = None,
    ) -> AsyncGenerator[AgentEvent, None]:
        """
        Streaming variant. Yields AgentEvent objects:
        - ThinkingDelta / ThinkingEnd: reasoning chunks
        - ToolEvent: tool call + result
        - TextDelta / StreamEnd: final streamed response
        - ContextCompressed: emitted by workers after compression

        Args:
            user_message: The text sent to the LLM (may contain injected hints).
            display_message: Optional text shown to the user in the chat UI.
                When omitted, user_message is used for both LLM context and display.
        """
        session = await self._sessions.get(session_id)
        if session is None:
            raise SessionNotFound(session_id)

        profile = self._profiles.get(session.profile_id)
        tools = self._tool_list(profile.enabled_tools, profile.mcp_servers)
        tool_schemas = [t.schema() for t in tools]
        llm = self._get_backend(profile.llm_backend)

        mem = await self._ctx_builder._memory_msg(user_id=session.user_id)

        # Expose session_id and model to tools via ContextVar
        from navi.tools.base import current_session_id as _sid_var, current_model as _model_var
        _sid_token = _sid_var.set(session_id)
        _model_var.set(profile.model)

        # Pre-turn compression: if the last turn filled the context past the
        # threshold, compress NOW before calling the LLM.  This prevents the
        # model from seeing an over-full context and generating gibberish
        # (e.g. a "summary of the conversation" instead of a real answer).
        if (
            settings.context_compression_enabled
            and session.context_token_count > 0
            and should_compress(
                session.context_token_count,
                settings.ollama_num_ctx,
                settings.context_compression_threshold,
            )
        ):
            event = await self._compress_session_context(
                session=session,
                llm=llm,
                model=profile.model,
                session_id=session_id,
                reason="preturn",
            )
            if event:
                yield event

        display_text = display_message if display_message is not None else user_message
        user_msg_display = Message(role="user", content=display_text, images=images or None,
                                   created_at=datetime.now(timezone.utc))
        user_msg_context = Message(role="user", content=user_message, images=images or None,
                                   created_at=datetime.now(timezone.utc))
        session.messages.append(user_msg_display)
        session.context.append(user_msg_context)
        # Persist user message immediately so it survives a client disconnect
        # before the assistant reply is ready.
        await self._sessions.save(session)

        stop_event = current_stop_event.get()
        _turn_start = time.monotonic()
        _tool_call_count = 0
        _subagent_tokens = 0
        _prev_tokens = session.context_token_count  # token baseline before this turn

        # Planning phase — always runs on the first user message in a session;
        # on subsequent messages uses the profile's planning_enabled flag.
        # force_plan suppresses the DIRECT shortcut: first message is always forced,
        # and planning_mandatory extends that to every subsequent message.
        _is_first_message = sum(1 for m in session.messages if m.role == "user") == 1
        _force_plan = _is_first_message or profile.planning_mandatory
        if _is_first_message or profile.planning_enabled:
            async for _ev in self._planning.run(session.context, profile, llm, mem, tool_schemas, messages=session.messages, force_plan=_force_plan):
                if isinstance(_ev, AIHelperTokensUsed):
                    _subagent_tokens += _ev.total
                elif isinstance(_ev, PlanningDebugData):
                    session.planning_logs.append(_ev.log)
                    # Cap to prevent unbounded DB growth on long sessions
                    _MAX_PLANNING_LOGS = 20
                    if len(session.planning_logs) > _MAX_PLANNING_LOGS:
                        session.planning_logs = session.planning_logs[-_MAX_PLANNING_LOGS:]
                else:
                    yield _ev

        # Anti-stall state — tracks consecutive iterations without progress.
        # Two independent signals: no todo status change, and repeated identical tool calls.
        _stall_no_todo = 0          # iterations since last todo status change
        _stall_repeat_tools = 0     # iterations with identical tool calls as the previous turn
        _prev_tool_sigs: frozenset[tuple[str, str]] = frozenset()

        # Adaptive re-plan state — detect newly-failed todo steps and inject a
        # re-planning prompt on the following iteration so the model revises its plan.
        _known_failed: frozenset[tuple[int, str]] = frozenset()
        _replan_msg: str | None = None

        ctx_injections = await self._ctx_builder._collect_context_injections(profile)

        # Tool-calling loop — uses stream_complete() for every turn so thinking
        # is captured in real-time via ThinkingDelta/ThinkingEnd events.
        for iteration in range(profile.max_iterations):
            # Cooperative stop: check before each LLM call
            if stop_event and stop_event.is_set():
                await self._sessions.save(session)
                yield StreamStopped()
                return

            if settings.context_compression_enabled and iteration > 0:
                preflight_ctx = self._ctx_builder.build(
                    session.context,
                    profile,
                    mem,
                    extra_system=ctx_injections,
                    session_id=session_id,
                )
                estimated_tokens = self._estimate_context_tokens(preflight_ctx)
                if should_compress(
                    estimated_tokens,
                    settings.ollama_num_ctx,
                    settings.context_compression_threshold,
                ):
                    event = await self._compress_session_context(
                        session=session,
                        llm=llm,
                        model=profile.model,
                        session_id=session_id,
                        reason="midturn",
                        keep_recent_messages=max(12, settings.context_keep_recent * 2),
                    )
                    if event:
                        yield event
                        _prev_tokens = 0

            accumulated_text = ""
            accumulated_thinking = ""
            turn_tool_calls: list[ToolCallRequest] | None = None
            thinking_active = False
            context_tokens: int | None = None

            built_ctx = self._ctx_builder.build(session.context, profile, mem,
                                                  iteration=iteration, max_iterations=profile.max_iterations,
                                                  extra_system=ctx_injections,
                                                  session_id=session_id)

            if (
                profile.goal_anchoring_enabled
                and iteration > 0
                and iteration % profile.goal_anchoring_interval == 0
            ):
                built_ctx.append(self._ctx_builder._build_goal_anchor(session_id, user_message))

            todo_msg = _todo_progress_message(session_id, first_iteration=(iteration == 0))
            if todo_msg:
                built_ctx.append(todo_msg)

            # Snapshot todo state before this iteration (for stall detection after)
            _todo_snapshot_before = _todo_status_snapshot(session_id)

            # Adaptive re-plan: inject queued re-plan message from previous iteration
            if profile.adaptive_replan_enabled and _replan_msg:
                built_ctx.append(Message(role="system", content=_replan_msg))
                _replan_msg = None

            if profile.anti_stall_enabled and iteration > 0:
                stalled = (
                    _stall_no_todo >= profile.anti_stall_threshold
                    or _stall_repeat_tools >= profile.anti_stall_threshold
                )
                if stalled:
                    reason = (
                        f"no todo progress for {_stall_no_todo} iterations"
                        if _stall_no_todo >= profile.anti_stall_threshold
                        else f"identical tool calls repeated {_stall_repeat_tools} times"
                    )
                    built_ctx.append(Message(
                        role="system",
                        content=(
                            f"[Anti-stall warning — {reason}] "
                            "You are repeating the same actions without making progress. "
                            "Stop and reconsider: change your approach, try a different tool, "
                            "mark the current step as failed and move on, or ask the user for guidance."
                        ),
                    ))

            try:
                self._check_context_size(built_ctx)
            except ContextTooLargeError as e:
                # Surface the error as a Navi response (not a raw system error) so the
                # user sees a coherent message and the exchange is saved to history.
                error_text = str(e)
                assistant_msg = Message(role="assistant", content=error_text)
                session.context.append(assistant_msg)
                session.messages.append(assistant_msg)
                await self._sessions.save(session)
                yield TextDelta(delta=error_text)
                yield StreamEnd(content=error_text)
                return

            async for chunk in _iter_stream_guarded(
                llm.stream_complete(
                    built_ctx,
                    tools=tool_schemas if tools else None,
                    temperature=profile.temperature,
                    model=profile.model,
                    think=profile.think_enabled,
                    top_k=profile.top_k,
                    top_p=profile.top_p,
                    num_thread=profile.num_thread,
                ),
                stop_event=stop_event,
                first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
                chunk_timeout=settings.llm_stream_chunk_timeout,
            ):
                # Cooperative stop: break cleanly so aclose() is called on the
                # generator → Ollama stream closes gracefully, model stays in VRAM.
                if stop_event and stop_event.is_set():
                    if thinking_active:
                        yield ThinkingEnd()
                    break
                if chunk.prompt_tokens is not None or chunk.completion_tokens is not None:
                    context_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0)
                if chunk.thinking:
                    accumulated_thinking += chunk.thinking
                    if not thinking_active:
                        thinking_active = True
                    yield ThinkingDelta(delta=chunk.thinking)
                elif chunk.delta:
                    if thinking_active:
                        thinking_active = False
                        yield ThinkingEnd()
                    accumulated_text += chunk.delta
                    yield TextDelta(delta=chunk.delta)
                if chunk.tool_calls:
                    turn_tool_calls = chunk.tool_calls
                if chunk.finish_reason and thinking_active:
                    thinking_active = False
                    yield ThinkingEnd()

            # Stopped mid-stream — save partial response and exit
            if stop_event and stop_event.is_set():
                if accumulated_text:
                    session.messages.append(Message(
                        role="assistant", content=accumulated_text,
                        created_at=datetime.now(timezone.utc),
                    ))
                await self._sessions.save(session)
                yield StreamStopped()
                return

            if not turn_tool_calls:
                # Final response — text already streamed above
                _elapsed = round(time.monotonic() - _turn_start, 1)
                # Net tokens = marginal cost of this turn (delta from baseline) + subagent tokens
                _net_tokens = max(0, (context_tokens or 0) - _prev_tokens) + _subagent_tokens
                assistant_msg = Message(
                    role="assistant",
                    content=accumulated_text or None,
                    thinking=accumulated_thinking or None,
                    created_at=datetime.now(timezone.utc),
                    elapsed_seconds=_elapsed,
                    tool_call_count=_tool_call_count if _tool_call_count else None,
                    token_count=_net_tokens if _net_tokens else None,
                )
                session.messages.append(assistant_msg)
                session.context.append(assistant_msg)
                session.context_token_count = context_tokens or 0
                await self._sessions.save(session)

                yield StreamEnd(
                    full_content=accumulated_text,
                    context_tokens=context_tokens,
                    max_context_tokens=settings.ollama_num_ctx,
                    elapsed_seconds=_elapsed,
                    tool_call_count=_tool_call_count,
                    token_count=_net_tokens if _net_tokens else None,
                )

                for event in await self._run_workers(session, llm, profile.model, context_tokens):
                    yield event
                return

            # Tool calls turn — record to session and execute
            assistant_msg = Message(
                role="assistant",
                content=accumulated_text or None,
                thinking=accumulated_thinking or None,
                tool_calls=turn_tool_calls,
            )
            session.messages.append(assistant_msg)
            session.context.append(assistant_msg)

            tool_map = {t.name: t for t in tools}
            for tc in turn_tool_calls:
                # 1. Announce immediately so the UI shows a pending card
                yield ToolStarted(tool_name=tc.name, arguments=tc.arguments)

                # 2. Create a sink queue for sub-agent events from this tool call.
                #    create_task() snapshots the current ContextVar values, so the
                #    task will inherit current_event_sink = sink.
                sink: asyncio.Queue = asyncio.Queue()
                sink_token = current_event_sink.set(sink)
                result_holder: list = []

                async def _run_with_sentinel(_tc=tc, _holder=result_holder, _sink=sink):
                    try:
                        _holder.append(await self._tool_executor._run_single_tool(_tc, tool_map))
                    except Exception as exc:
                        _holder.append(exc)
                    finally:
                        await _sink.put(_TOOL_DONE)

                tool_task = asyncio.create_task(_run_with_sentinel())
                current_event_sink.reset(sink_token)  # outer ctx restored; task has its own copy

                try:
                    # 3. Block on the sink until the sentinel arrives.
                    #    Sub-agent ToolStarted/ToolEvent objects come through here in real time.
                    while True:
                        item = await sink.get()
                        if item is _TOOL_DONE:
                            break
                        if isinstance(item, SubagentComplete):
                            _subagent_tokens += item.token_count
                            _tool_call_count += item.tool_call_count
                        elif isinstance(item, AIHelperTokensUsed):
                            _subagent_tokens += item.total
                        else:
                            yield item

                    # 4. Unpack result or handle exception
                    r = result_holder[0] if result_holder else RuntimeError("tool task produced no result")
                    if isinstance(r, Exception):
                        # Infrastructure errors (LLMBackendError / LLMConnectionError) abort the turn;
                        # everything else becomes a failed tool result so the loop can continue.
                        if isinstance(r, (LLMBackendError, LLMConnectionError)):
                            raise r
                        log.warning("agent.tool_exception", tool=tc.name, error=str(r))
                        tool_event = ToolEvent(
                            tool_name=tc.name, arguments=tc.arguments,
                            result=f"Error: {r}", success=False,
                        )
                        msg = Message(role="tool", content=f"Error: {r}", tool_call_id=tc.id, name=tc.name, metadata={})
                        image_msg = None
                    else:
                        tool_event, msg, image_msg = r

                    # 5. Yield the completed ToolEvent and record in session
                    _tool_call_count += 1
                    yield tool_event
                    session.messages.append(msg)
                    session.context.append(msg)
                    if image_msg:
                        session.context.append(image_msg)
                finally:
                    if not tool_task.done():
                        tool_task.cancel()
                    try:
                        await tool_task
                    except Exception:
                        pass

            # 6. Cooperative stop: check after tool execution before next LLM call
            if stop_event and stop_event.is_set():
                await self._sessions.save(session)
                yield StreamStopped()
                return

            # Update anti-stall counters after all tools in this iteration ran.
            if profile.anti_stall_enabled:
                # Todo progress signal
                if _todo_status_snapshot(session_id) != _todo_snapshot_before:
                    _stall_no_todo = 0
                else:
                    _stall_no_todo += 1

                # Repeated tool call signal
                cur_sigs = frozenset(
                    (tc.name, json.dumps(tc.arguments, sort_keys=True))
                    for tc in (turn_tool_calls or [])
                )
                if cur_sigs and cur_sigs == _prev_tool_sigs:
                    _stall_repeat_tools += 1
                else:
                    _stall_repeat_tools = 0
                _prev_tool_sigs = cur_sigs

            # Adaptive re-plan: detect steps that were newly marked failed this iteration.
            if profile.adaptive_replan_enabled:
                current_failed = _todo_failed_steps(session_id)
                new_failures = current_failed - _known_failed
                _known_failed = current_failed
                if new_failures:
                    failed_labels = ", ".join(
                        f'step {idx} ("{text}")'
                        for idx, text in sorted(new_failures)
                    )
                    _replan_msg = (
                        f"[Adaptive re-plan] {failed_labels} just failed. "
                        "Before continuing, revise your plan with the todo tool: either replace the remaining "
                        "pending steps or mark failed/skipped steps with validation. Then continue execution "
                        "with an approach that accounts for what went wrong."
                    )
                    log.info("agent.adaptive_replan_queued", failures=len(new_failures),
                             session_id=session_id)

            # 7. If switch_profile was called this iteration, reload profile + tools.
            #    switch_profile updates the DB but run_stream() holds a local session
            #    object — without this check the final save would overwrite the change
            #    and the next LLM call would still use the old tool schemas.
            fresh = await self._sessions.get(session_id)
            if fresh and fresh.profile_id != session.profile_id:
                session.profile_id = fresh.profile_id
                profile = self._profiles.get(session.profile_id)
                tools = self._tool_list(profile.enabled_tools, profile.mcp_servers)
                tool_schemas = [t.schema() for t in tools]
                llm = self._get_backend(profile.llm_backend)
                log.info(
                    "agent.profile_reloaded",
                    session_id=session_id,
                    new_profile=session.profile_id,
                )

        await self._sessions.save(session)
        raise MaxIterationsReached(profile.max_iterations)

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    async def _run_workers(
        self,
        session,
        llm: LLMBackend,
        model: str,
        context_tokens: int | None,
    ) -> list[AgentEvent]:
        """Run all workers sequentially; collect their events."""
        from navi.workers.base import WorkerContext

        ctx = WorkerContext(
            session_id=session.id,
            context_tokens=context_tokens,
            max_context_tokens=settings.ollama_num_ctx,
            llm=llm,
            model=model,
            temperature=settings.context_summary_temperature,
            session_store=self._sessions,
        )
        events: list[AgentEvent] = []
        for worker in self._workers:
            try:
                result = await worker.run(session, ctx)
                events.extend(result.events)
            except Exception:
                log.warning("agent.worker_failed",
                            worker=type(worker).__name__, exc_info=True)
        return events

    async def _compress_session_context(
        self,
        session,
        llm: LLMBackend,
        model: str,
        session_id: str,
        reason: str,
        keep_recent_messages: int | None = None,
    ) -> ContextCompressed | None:
        """Compress session.context and persist it, returning a UI event when it changed."""
        try:
            result = await compress_context(
                context=session.context,
                llm=llm,
                model=model,
                temperature=settings.context_summary_temperature,
                keep_recent=settings.context_keep_recent,
                max_tokens=settings.context_summary_max_tokens,
                keep_recent_messages=keep_recent_messages,
            )
        except Exception:
            log.warning(
                "agent.context_compress_failed",
                session_id=session_id,
                reason=reason,
                exc_info=True,
            )
            return None

        if result is None:
            return None

        new_context, summary_text = result
        count_before = len(session.context)
        session.context = new_context
        session.context_token_count = 0
        session.messages.append(Message(
            role="system",
            is_compression=True,
            content=summary_text,
        ))
        await self._sessions.save(session)

        log.info(
            "agent.context_compress",
            session_id=session_id,
            reason=reason,
            before=count_before,
            after=len(new_context),
        )

        return ContextCompressed(
            messages_before=count_before,
            messages_after=len(new_context),
            summary=summary_text,
            context_tokens=session.context_token_count,
            max_context_tokens=settings.ollama_num_ctx,
        )

    @staticmethod
    def _estimate_context_tokens(context: list[Message]) -> int:
        """Conservative local estimate used before the next LLM call returns real token counts."""
        chars = sum(len(m.content or "") for m in context)
        imgs = sum(500 for m in context if m.images)
        return chars // 4 + imgs

    def _tool_list(
        self,
        enabled: list[str],
        mcp_servers: dict[str, list[str]] | None = None,
    ) -> list[Tool]:
        names = list(enabled)
        extra = _load_user_enabled_tools()
        for name in extra:
            if name not in names:
                names.append(name)

        # Expand MCP server groups into concrete tool names
        if mcp_servers and self._mcp_manager:
            for server_name, groups in mcp_servers.items():
                if "*" in groups:
                    # All registered tools for this server
                    prefix = f"mcp_{server_name}_"
                    for tool in self._tools.all():
                        if tool.name.startswith(prefix) and tool.name not in names:
                            names.append(tool.name)
                else:
                    for group_name in groups:
                        for tool_name in self._mcp_manager.resolve_group(server_name, group_name):
                            full_name = f"mcp_{server_name}_{tool_name}"
                            if full_name not in names:
                                names.append(full_name)

        result = []
        for name in names:
            try:
                result.append(self._tools.get(name))
            except Exception:
                pass
        return result

    def _get_backend(self, backend_key: str) -> LLMBackend:
        return self._backends.get(backend_key)

    def _check_context_size(self, context: list[Message]) -> None:
        """Raise ContextTooLargeError before an LLM call if the context is dangerously large.

        Uses a conservative character-based estimate (~4 chars per token for text).
        Images are counted at 500 tokens each (rough vision-model estimate).

        Checks against the *remaining* budget, not a fixed percentage of the window:
            available_for_input = ollama_num_ctx - output_reserve
        where output_reserve is a fixed token headroom reserved for the model's response.
        This correctly accounts for sessions where conversation history already consumes
        a large portion of the window.
        """
        if not context:
            return

        output_reserve = settings.output_reserve_tokens

        def _estimate(msgs: list[Message]) -> int:
            chars = sum(len(m.content or "") for m in msgs)
            imgs = sum(500 for m in msgs if m.images)
            return chars // 4 + imgs

        total = _estimate(context)
        available = settings.ollama_num_ctx - output_reserve

        if total > available:
            existing = _estimate(context[:-1])
            new = _estimate(context[-1:])
            remaining = available - existing
            raise ContextTooLargeError(
                f"Context too large: new content is ~{new:,} estimated tokens, "
                f"but only ~{max(0, remaining):,} tokens are available "
                f"(window {settings.ollama_num_ctx:,}, already used ~{existing:,}, "
                f"output_reserve {output_reserve:,}). "
                "Split the file into smaller parts or delegate to a subagent."
            )