Newer
Older
navi-1 / navi / core / agent.py
"""
Agent: the tool-calling loop.

Flow:
1. Receive user message, load session + profile
2. Build tool schemas from profile's enabled_tools
3. Loop (up to max_iterations):
   a. Call LLM with session.context (may be compressed) + tool schemas
   b. If finish_reason == "stop"  -> stream final response
   c. If finish_reason == "tool_calls" -> execute tools, append to both
      session.messages (display history) and session.context (LLM context)
4. After StreamEnd: run workers sequentially (e.g. context compression)

session.messages — full display history, never compressed
session.context  — what the LLM sees; workers may compress this
"""

import asyncio
import base64
import io
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator

import structlog
from PIL import Image

from navi.config import settings
from navi.exceptions import ContextTooLargeError, LLMBackendError, LLMConnectionError, MaxIterationsReached, SessionNotFound
from navi.llm.base import LLMBackend, Message, ToolCallRequest
from navi.tools._internal.base import Tool, ToolContext, current_event_sink, current_stop_event, current_user_role, current_user_info

from .agent_run_context import AgentTurnContext, StreamState
from .anti_stall import AntiStallMonitor
from .compressor import ContextCompressor, should_compress
from .context_builder import ContextBuilder
from .planning import PlanningEngine
from .stream_guard import _iter_stream_guarded
from .subagent_runner import SubAgentRunner
from .tool_utils import build_tool_list, load_user_enabled_tools
from .events import (
    AgentEvent,
    AIHelperTokensUsed,
    CompressionStarted,
    ContextCompressed,
    PlanningDebugData,
    PlanningStatus,
    PlanReady,
    StreamEnd,
    StreamStopped,
    SubagentComplete,
    TextDelta,
    ThinkingDelta,
    ThinkingEnd,
    ToolEvent,
    ToolStarted,
    TurnThinking,
)
from .registry import BackendRegistry, ProfileRegistry, ToolRegistry
from .session import SessionStore
from .tool_executor import ToolExecutor

if TYPE_CHECKING:
    from navi.context_providers._loader import ContextProviderRegistry
    from navi.memory.store import MemoryStore
    from navi.workers.base import Worker, WorkerContext



log = structlog.get_logger()

# Sentinel: placed in the event sink by the tool wrapper to signal completion.
_TOOL_DONE = object()


async def _todo_progress_message(session_id: str, *, first_iteration: bool = False) -> "Message | None":
    """Build a compact system reminder with current todo state and update discipline."""
    from navi.tools.todo import get_progress_message
    return await get_progress_message(session_id, first_iteration=first_iteration)


class Agent:
    def __init__(
        self,
        session_store: "SessionStore | None",
        profile_registry: ProfileRegistry,
        tool_registry: ToolRegistry,
        backend_registry: BackendRegistry,
        workers: list["Worker"] | None = None,
        memory_store: "MemoryStore | None" = None,
        cp_registry: "ContextProviderRegistry | None" = None,
        mcp_manager=None,
    ) -> None:
        self._sessions = session_store
        self._profiles = profile_registry
        self._tools = tool_registry
        self._backends = backend_registry
        self._workers: list["Worker"] = workers or []
        self._memory = memory_store
        self._cp_registry = cp_registry
        self._mcp_manager = mcp_manager
        self._ctx_builder = ContextBuilder(
            profile_registry=profile_registry,
            memory_store=memory_store,
            cp_registry=cp_registry,
            mcp_manager=mcp_manager,
        )
        self._tool_executor = ToolExecutor(tool_registry)
        self._planning = PlanningEngine(self._ctx_builder)
        self._compressor = ContextCompressor()
        self._subagent = SubAgentRunner(
            profile_registry=profile_registry,
            tool_registry=tool_registry,
            backend_registry=backend_registry,
            ctx_builder=self._ctx_builder,
            compressor=self._compressor,
            planning=self._planning,
            tool_executor=self._tool_executor,
            session_store=session_store,
            mcp_manager=mcp_manager,
        )

    # ------------------------------------------------------------------
    # Public interface
    # ------------------------------------------------------------------

    async def run(self, session_id: str, user_message: str, images: list[str] | None = None, files: list[dict] | None = None, is_recall: bool = False) -> str:
        """Non-streaming: run the full tool-calling loop and return the final text."""
        full_content = ""
        async for event in self.run_stream(session_id, user_message, images=images, files=files, is_recall=is_recall):
            if isinstance(event, StreamEnd):
                full_content = event.full_content or ""
        return full_content

    async def run_ephemeral(
        self,
        user_message: str,
        profile_id: str,
        max_iterations: int = 40,
        exclude_tools: list[str] | None = None,
        briefing: str | None = None,
        custom_system_prompt: str | None = None,
        inherit_system_prompt: bool = False,
        context_transfer: str | None = None,
        parent_session_id: str | None = None,
        timeout_seconds: float = 300.0,
    ) -> tuple[str, bool]:
        """Delegate to SubAgentRunner."""
        return await self._subagent.run(
            user_message=user_message,
            profile_id=profile_id,
            max_iterations=max_iterations,
            exclude_tools=exclude_tools,
            briefing=briefing,
            custom_system_prompt=custom_system_prompt,
            inherit_system_prompt=inherit_system_prompt,
            context_transfer=context_transfer,
            parent_session_id=parent_session_id,
            timeout_seconds=timeout_seconds,
        )

    async def run_stream(
        self,
        session_id: str,
        user_message: str,
        images: list[str] | None = None,
        display_message: str | None = None,
        files: list[dict] | None = None,
        is_recall: bool = False,
    ) -> AsyncGenerator[AgentEvent, None]:
        """
        Streaming variant. Yields AgentEvent objects:
        - ThinkingDelta / ThinkingEnd: reasoning chunks
        - ToolEvent: tool call + result
        - TextDelta / StreamEnd: final streamed response
        - ContextCompressed: emitted by workers after compression

        Args:
            user_message: The text sent to the LLM (may contain injected hints).
            display_message: Optional text shown to the user in the chat UI.
                When omitted, user_message is used for both LLM context and display.
        """
        session = await self._sessions.get(session_id)
        if session is None:
            raise SessionNotFound(session_id)

        profile = self._profiles.get(session.profile_id)
        tools = self._tool_list(profile.get_agent_tools())
        tool_schemas = [t.schema() for t in tools]
        llm = self._get_backend(profile.llm_backend)

        mem = await self._ctx_builder._memory_msg(user_id=session.user_id)

        # Expose session_id and model to tools via ContextVar
        from navi.tools._internal.base import current_session_id as _sid_var, current_model as _model_var
        _sid_token = _sid_var.set(session_id)
        _model_var.set(profile.model)

        # Pre-turn compression: if the last turn filled the context past the
        # threshold, compress NOW before calling the LLM.  This prevents the
        # model from seeing an over-full context and generating gibberish
        # (e.g. a "summary of the conversation" instead of a real answer).
        async for _ev in self._compression_events_preturn(session, llm, profile, session_id):
            yield _ev

        display_text = display_message if display_message is not None else user_message
        user_msg_display = Message(role="user", content=display_text, images=images or None,
                                   files=files or None, created_at=datetime.now(timezone.utc),
                                   is_recall=is_recall, is_context=False)

        # Image token budgeting: fit as many images as possible into the LLM context.
        # Overflow images are saved to the session directory so Navi can view them
        # later via image_view if needed.
        images_for_context = images
        context_content = user_message
        if images:
            current_tokens = self._compressor.estimate_context_tokens(session.context)
            available_tokens = int(settings.ollama_num_ctx * 0.8) - current_tokens
            max_images = max(0, available_tokens // 500)
            if max_images < len(images):
                images_for_context = images[:max_images]
                overflow = images[max_images:]
                saved_names = []
                try:
                    session_dir = Path(settings.session_files_dir) / session_id
                    session_dir.mkdir(parents=True, exist_ok=True)
                    for idx, b64 in enumerate(overflow):
                        raw = base64.b64decode(b64)
                        img = Image.open(io.BytesIO(raw))
                        img = img.convert("RGB")
                        w, h = img.size
                        if w > 1024 or h > 1024:
                            ratio = 1024 / max(w, h)
                            img = img.resize((int(w * ratio), int(h * ratio)), Image.LANCZOS)
                        name = f"uploaded_{idx}.jpg"
                        path = session_dir / name
                        img.save(path, format="JPEG", quality=85, optimize=True)
                        saved_names.append(name)
                except Exception:
                    pass
                if saved_names:
                    context_content += (
                        f"\n\n[Additional images saved to session directory: {', '.join(saved_names)}]"
                    )

        user_msg_context = Message(role="user", content=context_content, images=images_for_context or None,
                                   files=files or None, created_at=datetime.now(timezone.utc),
                                   is_recall=is_recall, is_display=False)
        session.messages.append(user_msg_display)
        session.messages.append(user_msg_context)
        session.context.append(user_msg_context)
        # Persist user message immediately so it survives a client disconnect
        # before the assistant reply is ready.
        await self._sessions.save(session)

        stop_event = current_stop_event.get()
        turn_ctx = AgentTurnContext(turn_start=time.monotonic())

        # Planning phase — always runs on the first user message in a session;
        # on subsequent messages uses the profile's planning_enabled flag.
        # force_plan suppresses the DIRECT shortcut: first message is always forced,
        # and planning_mandatory extends that to every subsequent message.
        _is_first_message = sum(1 for m in session.messages if m.role == "user") == 1
        _force_plan = _is_first_message or profile.planning_mandatory
        if _is_first_message or profile.planning_enabled:
            async for _ev in self._planning.run(session.context, profile, llm, mem, tool_schemas, messages=session.messages, force_plan=_force_plan):
                if isinstance(_ev, AIHelperTokensUsed):
                    turn_ctx.subagent_tokens += _ev.completion_tokens
                elif isinstance(_ev, PlanningDebugData):
                    session.planning_logs.append(_ev.log)
                    # Cap to prevent unbounded DB growth on long sessions
                    _MAX_PLANNING_LOGS = 20
                    if len(session.planning_logs) > _MAX_PLANNING_LOGS:
                        session.planning_logs = session.planning_logs[-_MAX_PLANNING_LOGS:]
                else:
                    yield _ev

        ctx_task = asyncio.create_task(self._ctx_builder._collect_context_injections(profile))
        mem_facts_task = asyncio.create_task(
            self._ctx_builder._memory_facts_msg(
                user_message, user_id=session.user_id, injected_ids=turn_ctx.injected_fact_ids
            )
        )
        ctx_injections = await ctx_task
        mem_facts = await mem_facts_task
        if mem_facts:
            ctx_injections.append(mem_facts)
            log.debug("agent.memory_facts_injected", session_id=session_id, facts_msg_length=len(mem_facts.content or ""))
        else:
            log.debug("agent.memory_facts_none", session_id=session_id)

        anti_stall = AntiStallMonitor(profile)
        if profile.anti_stall_enabled or profile.adaptive_replan_enabled:
            await anti_stall.init(session_id)

        # Tool-calling loop — uses stream_complete() for every turn so thinking
        # is captured in real-time via ThinkingDelta/ThinkingEnd events.
        for iteration in range(profile.max_iterations):
            # Cooperative stop: check before each LLM call
            if stop_event and stop_event.is_set():
                await self._sessions.save(session)
                yield StreamStopped()
                return

            async for _ev in self._compression_events_midturn(session, llm, profile, session_id, iteration, ctx_injections, mem):
                yield _ev

            state = StreamState()

            built_ctx = self._ctx_builder.build(session.context, profile, mem,
                                                  iteration=iteration, max_iterations=profile.max_iterations,
                                                  extra_system=ctx_injections,
                                                  session_id=session_id)

            if (
                profile.goal_anchoring_enabled
                and iteration > 0
                and iteration % profile.goal_anchoring_interval == 0
            ):
                built_ctx.append(await self._ctx_builder._build_goal_anchor(session_id, user_message))

            todo_msg = await _todo_progress_message(session_id, first_iteration=(iteration == 0))
            if todo_msg:
                built_ctx.append(todo_msg)

            # Anti-stall / adaptive re-plan: inject intervention message if needed
            _stall_msg = await anti_stall.pre_turn(session_id, iteration)
            if _stall_msg:
                built_ctx.append(_stall_msg)

            try:
                self._compressor.check_context_size(built_ctx)
            except ContextTooLargeError as e:
                # Surface the error as a Navi response (not a raw system error) so the
                # user sees a coherent message and the exchange is saved to history.
                error_text = str(e)
                assistant_msg = Message(role="assistant", content=error_text)
                session.context.append(assistant_msg)
                session.messages.append(assistant_msg)
                await self._sessions.save(session)
                yield TextDelta(delta=error_text)
                yield StreamEnd(content=error_text)
                return

            async for _ev in self._consume_stream(
                _iter_stream_guarded(
                    llm.stream_complete(
                        built_ctx,
                        tools=tool_schemas if tools else None,
                        temperature=profile.temperature,
                        model=profile.model,
                        think=profile.think_enabled,
                        top_k=profile.top_k,
                        top_p=profile.top_p,
                        num_thread=profile.num_thread,
                    ),
                    stop_event=stop_event,
                    first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
                    chunk_timeout=settings.llm_stream_chunk_timeout,
                ),
                stop_event,
                turn_ctx,
                state,
            ):
                yield _ev

            # Stopped mid-stream — save partial response and exit
            if stop_event and stop_event.is_set():
                if state.accumulated_text:
                    session.messages.append(Message(
                        role="assistant", content=state.accumulated_text,
                        created_at=datetime.now(timezone.utc),
                        is_context=False,
                    ))
                await self._sessions.save(session)
                yield StreamStopped()
                return

            turn_tool_calls = state.turn_tool_calls

            if not turn_tool_calls:
                # Final response — text already streamed above
                _elapsed = round(time.monotonic() - turn_ctx.turn_start, 1)
                # Net tokens = accumulated across all iterations + subagent tokens
                _net_tokens = turn_ctx.turn_tokens + turn_ctx.subagent_tokens
                assistant_msg = Message(
                    role="assistant",
                    content=state.accumulated_text or None,
                    thinking=state.accumulated_thinking or None,
                    created_at=datetime.now(timezone.utc),
                    elapsed_seconds=_elapsed,
                    tool_call_count=turn_ctx.tool_call_count if turn_ctx.tool_call_count else None,
                    token_count=_net_tokens if _net_tokens else None,
                )
                session.messages.append(assistant_msg)
                session.context.append(assistant_msg)
                session.context_token_count = state.context_tokens or 0
                await self._sessions.save(session)

                yield StreamEnd(
                    full_content=state.accumulated_text,
                    context_tokens=state.context_tokens,
                    max_context_tokens=settings.ollama_num_ctx,
                    elapsed_seconds=_elapsed,
                    tool_call_count=turn_ctx.tool_call_count,
                    token_count=_net_tokens if _net_tokens else None,
                )

                for event in await self._run_workers(session, llm, profile.model, state.context_tokens):
                    yield event
                return

            # Tool calls turn — record to session and execute
            assistant_msg = Message(
                role="assistant",
                content=state.accumulated_text or None,
                thinking=state.accumulated_thinking or None,
                tool_calls=turn_tool_calls,
            )
            session.messages.append(assistant_msg)
            session.context.append(assistant_msg)

            tool_ctx = ToolContext(
                session_id=session_id,
                event_sink=None,  # set per-tool inside _execute_tools_with_sink
                stop_event=stop_event,
                model=profile.model,
                user_id=session.user_id,
                user_role=current_user_role.get(),
                user_info=current_user_info.get(),
            )
            async for _ev in self._execute_tools_with_sink(turn_tool_calls, tools, turn_ctx, session, stop_event, tool_ctx):
                yield _ev

            # 6. Cooperative stop: check after tool execution before next LLM call
            if stop_event and stop_event.is_set():
                await self._sessions.save(session)
                yield StreamStopped()
                return

            # Update anti-stall counters and adaptive-replan state
            await anti_stall.post_turn(session_id, turn_tool_calls)

            # 7. If switch_profile was called this iteration, reload profile + tools.
            #    switch_profile updates the DB but run_stream() holds a local session
            #    object — without this check the final save would overwrite the change
            #    and the next LLM call would still use the old tool schemas.
            fresh = await self._sessions.get(session_id)
            if fresh and fresh.profile_id != session.profile_id:
                session.profile_id = fresh.profile_id
                profile = self._profiles.get(session.profile_id)
                tools = self._tool_list(profile.get_agent_tools())
                tool_schemas = [t.schema() for t in tools]
                llm = self._get_backend(profile.llm_backend)
                log.info(
                    "agent.profile_reloaded",
                    session_id=session_id,
                    new_profile=session.profile_id,
                )

        await self._sessions.save(session)
        raise MaxIterationsReached(profile.max_iterations)

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    async def _run_workers(
        self,
        session,
        llm: LLMBackend,
        model: str,
        context_tokens: int | None,
    ) -> list[AgentEvent]:
        """Run all workers sequentially; collect their events."""
        from navi.workers.base import WorkerContext

        ctx = WorkerContext(
            session_id=session.id,
            context_tokens=context_tokens,
            max_context_tokens=settings.ollama_num_ctx,
            llm=llm,
            model=model,
            temperature=settings.context_summary_temperature,
            session_store=self._sessions,
        )
        events: list[AgentEvent] = []
        for worker in self._workers:
            try:
                result = await worker.run(session, ctx)
                events.extend(result.events)
            except Exception:
                log.warning("agent.worker_failed",
                            worker=type(worker).__name__, exc_info=True)
        return events

    async def _do_compress_and_save(
        self,
        session,
        llm: LLMBackend,
        model: str,
        session_id: str,
        reason: str,
        keep_recent_messages: int | None = None,
    ) -> ContextCompressed | None:
        """Compress session.context and persist it, returning a UI event when it changed."""
        count_before = len(session.context)
        result = await self._compressor.compress_session(
            context=session.context,
            llm=llm,
            model=model,
            temperature=settings.context_summary_temperature,
            keep_recent=settings.context_keep_recent,
            max_tokens=settings.context_summary_max_tokens,
            keep_recent_messages=keep_recent_messages,
        )
        if result is None:
            return None
        new_context, summary_text = result

        # Mark messages that are no longer part of the LLM context
        new_context_ids = {id(m) for m in new_context}
        for msg in session.messages:
            if id(msg) not in new_context_ids and msg.role != "system":
                msg.is_context = False

        # The summary returned by the compressor must also live in messages so
        # save() writes it to the normalized table, but it is not displayed.
        summary_msg = next((m for m in new_context if m.is_summary), None)
        if summary_msg and summary_msg not in session.messages:
            summary_msg.is_display = False
            session.messages.append(summary_msg)

        # UI marker showing that compression happened
        session.messages.append(Message(
            role="system",
            is_compression=True,
            is_context=False,
            content=summary_text,
        ))

        session.context = new_context
        session.context_token_count = self._compressor.estimate_context_tokens(new_context)
        await self._sessions.save(session)

        # Archive old messages if the hot table exceeds the configured window.
        if settings.session_messages_window > 0 and session.db_next_sequence > settings.session_messages_window:
            threshold = session.db_next_sequence - settings.session_messages_window
            archived = await self._sessions.archive_old_messages(session_id, threshold)
            if archived > 0:
                log.info(
                    "agent.archive_messages",
                    session_id=session_id,
                    archived=archived,
                    threshold=threshold,
                )

        log.info(
            "agent.context_compress",
            session_id=session_id,
            reason=reason,
            before=count_before,
            after=len(new_context),
        )

        return ContextCompressed(
            messages_before=count_before,
            messages_after=len(new_context),
            summary=summary_text,
            context_tokens=session.context_token_count,
            max_context_tokens=settings.ollama_num_ctx,
        )

    def _tool_list(
        self,
        scope: "ToolScopeConfig",
    ) -> list[Tool]:
        return build_tool_list(scope.native, scope.mcp, self._tools, self._mcp_manager)

    def _get_backend(self, backend_key: str) -> LLMBackend:
        return self._backends.get(backend_key)

    async def _compression_events_preturn(self, session, llm, profile, session_id):
        if (
            settings.context_compression_enabled
            and len(session.context) > 2
            and should_compress(
                self._compressor.estimate_context_tokens(session.context),
                settings.ollama_num_ctx,
                settings.context_compression_threshold,
            )
        ):
            yield CompressionStarted(
                context_tokens=self._compressor.estimate_context_tokens(session.context),
                max_context_tokens=settings.ollama_num_ctx,
            )
            event = await self._do_compress_and_save(
                session=session,
                llm=llm,
                model=profile.model,
                session_id=session_id,
                reason="preturn",
            )
            if event:
                yield event

    async def _compression_events_midturn(self, session, llm, profile, session_id, iteration, ctx_injections, mem):
        if settings.context_compression_enabled and iteration > 0:
            preflight_ctx = self._ctx_builder.build(
                session.context,
                profile,
                mem,
                extra_system=ctx_injections,
                session_id=session_id,
            )
            estimated_tokens = self._compressor.estimate_context_tokens(preflight_ctx)
            if should_compress(
                estimated_tokens,
                settings.ollama_num_ctx,
                settings.context_compression_threshold,
            ):
                yield CompressionStarted(
                    context_tokens=estimated_tokens,
                    max_context_tokens=settings.ollama_num_ctx,
                )
                event = await self._do_compress_and_save(
                    session=session,
                    llm=llm,
                    model=profile.model,
                    session_id=session_id,
                    reason="midturn",
                    keep_recent_messages=max(12, settings.context_keep_recent * 2),
                )
                if event:
                    yield event

    async def _consume_stream(self, stream_gen, stop_event, turn_ctx: AgentTurnContext, state: StreamState):
        async for chunk in stream_gen:
            if stop_event and stop_event.is_set():
                if state.thinking_active:
                    yield ThinkingEnd()
                break
            if chunk.prompt_tokens is not None:
                state.context_tokens = chunk.prompt_tokens
            if chunk.completion_tokens is not None:
                turn_ctx.turn_tokens += chunk.completion_tokens
            if chunk.thinking:
                state.accumulated_thinking += chunk.thinking
                if not state.thinking_active:
                    state.thinking_active = True
                yield ThinkingDelta(delta=chunk.thinking)
            elif chunk.delta:
                if state.thinking_active:
                    state.thinking_active = False
                    yield ThinkingEnd()
                state.accumulated_text += chunk.delta
                yield TextDelta(delta=chunk.delta)
            if chunk.tool_calls:
                state.turn_tool_calls = chunk.tool_calls
            if chunk.finish_reason and state.thinking_active:
                state.thinking_active = False
                yield ThinkingEnd()

    async def _execute_tools_with_sink(self, turn_tool_calls, tools, turn_ctx: AgentTurnContext, session, stop_event, tool_ctx=None):
        """Execute tool calls with cooperative stop support.

        Polls *stop_event* every second while draining the event sink so the
        Stop button works even during long-running tools (terminal, SSH,
        web search, sub-agent spawn).
        """
        tool_map = {t.name: t for t in tools}
        for tc in turn_tool_calls:
            yield ToolStarted(tool_name=tc.name, arguments=tc.arguments, tool_call_id=tc.id)

            sink: asyncio.Queue = asyncio.Queue()
            sink_token = current_event_sink.set(sink)
            result_holder: list = []

            async def _run_with_sentinel(_tc=tc, _holder=result_holder, _sink=sink):
                try:
                    _holder.append(await self._tool_executor._run_single_tool(_tc, tool_map, ctx=tool_ctx))
                except Exception as exc:
                    _holder.append(exc)
                finally:
                    await _sink.put(_TOOL_DONE)

            tool_task = asyncio.create_task(_run_with_sentinel())
            current_event_sink.reset(sink_token)

            try:
                # Poll sink with 1s timeout so stop_event is checked during long tools
                stopped = False
                while True:
                    try:
                        item = await asyncio.wait_for(sink.get(), timeout=1.0)
                    except asyncio.TimeoutError:
                        if stop_event and stop_event.is_set():
                            stopped = True
                            if not tool_task.done():
                                tool_task.cancel()
                            break
                        continue

                    if item is _TOOL_DONE:
                        break
                    if isinstance(item, SubagentComplete):
                        turn_ctx.subagent_tokens += item.token_count
                        turn_ctx.tool_call_count += item.tool_call_count
                    elif isinstance(item, AIHelperTokensUsed):
                        turn_ctx.subagent_tokens += item.completion_tokens
                    else:
                        yield item

                if stopped:
                    # Tool was interrupted by user — record a synthetic cancellation result
                    log.info("agent.tool_stopped", tool=tc.name)
                    yield ToolEvent(
                        tool_name=tc.name, arguments=tc.arguments,
                        result="Tool execution was stopped by the user.", success=False,
                        tool_call_id=tc.id,
                    )
                    session.messages.append(Message(
                        role="tool", content="Tool execution was stopped by the user.",
                        tool_call_id=tc.id, name=tc.name, metadata={},
                        is_context=False,
                    ))
                    await self._sessions.save(session)
                    yield StreamStopped()
                    return

                r = result_holder[0] if result_holder else RuntimeError("tool task produced no result")
                if isinstance(r, Exception):
                    if isinstance(r, (LLMBackendError, LLMConnectionError)):
                        raise r
                    log.warning("agent.tool_exception", tool=tc.name, error=str(r))
                    tool_event = ToolEvent(
                        tool_name=tc.name, arguments=tc.arguments,
                        result=f"Error: {r}", success=False,
                        tool_call_id=tc.id,
                    )
                    msg = Message(role="tool", content=f"Error: {r}", tool_call_id=tc.id, name=tc.name, metadata={})
                    image_msg = None
                else:
                    tool_event, msg, image_msg = r

                turn_ctx.tool_call_count += 1
                yield tool_event
                session.messages.append(msg)
                session.context.append(msg)
                if image_msg:
                    session.context.append(image_msg)
            finally:
                if not tool_task.done():
                    tool_task.cancel()
                try:
                    await tool_task
                except Exception:
                    pass