Newer
Older
navi-1 / navi / core / agent.py
"""
Agent: the tool-calling loop.

Flow:
1. Receive user message, load session + profile
2. Build tool schemas from profile's enabled_tools
3. Loop (up to max_iterations):
   a. Call LLM with session.context (may be compressed) + tool schemas
   b. If finish_reason == "stop"  -> stream final response
   c. If finish_reason == "tool_calls" -> execute tools, append to both
      session.messages (display history) and session.context (LLM context)
4. After StreamEnd: run workers sequentially (e.g. context compression)

session.messages — full display history, never compressed
session.context  — what the LLM sees; workers may compress this
"""

import asyncio
import base64
import io
import re
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator

import structlog
from PIL import Image

from navi.config import settings
from navi.exceptions import ContextTooLargeError, LLMBackendError, LLMConnectionError, MaxIterationsReached, SessionNotFound
from navi.llm.base import LLMBackend, Message, ToolCallRequest
from navi.tools._internal.base import Tool, ToolContext, current_event_sink, current_stop_event, current_user_role, current_user_info

from .agent_run_context import AgentTurnContext, StreamState
from .anti_stall import AntiStallMonitor
from .compressor import ContextCompressor, should_compress
from .context_builder import ContextBuilder
from .planning import PlanningEngine
from .stream_guard import _iter_stream_guarded
from .subagent_runner import SubAgentRunner
from .tool_utils import build_tool_list, load_user_enabled_tools
from .events import (
    AgentEvent,
    AIHelperTokensUsed,
    CompressionStarted,
    ContextCompressed,
    PlanningDebugData,
    PlanningStatus,
    PlanReady,
    StreamEnd,
    StreamStopped,
    SubagentComplete,
    TextDelta,
    ThinkingDelta,
    ThinkingEnd,
    ToolEvent,
    ToolStarted,
    TurnThinking,
)
from .registry import BackendRegistry, ProfileRegistry, ToolRegistry
from .session import SessionStore
from .tool_executor import ToolExecutor

if TYPE_CHECKING:
    from navi.context_providers._loader import ContextProviderRegistry
    from navi.memory.store import MemoryStore
    from navi.workers.base import Worker, WorkerContext



log = structlog.get_logger()

# Sentinel: placed in the event sink by the tool wrapper to signal completion.
_TOOL_DONE = object()


_CASUAL_WORDS = frozenset({
    # Russian greetings/social
    "привет", "здравствуй", "здравствуйте", "хай", "хелло", "хеллоу",
    "как", "дела", "делишки", "ты", "вы", "поживаешь", "поживаете",
    "жизнь", "сам", "сама",
    "спасибо", "спс", "пока", "bye", "goodbye",
    "доброе", "утро", "добрый", "день", "вечер", "спокойной", "ночи",
    "ок", "окей", "ладно", "давай",
    # English greetings/social
    "hi", "hello", "hey", "hola", "bonjour",
    "how", "are", "you", "it", "going", "things", "what", "up", "s",
    "thanks", "thank", "thx",
    "good", "morning", "afternoon", "evening", "night", "see", "cya",
    "ok", "okay",
    # Common fillers that keep a social phrase social
    "a", "an", "the", "and", "too", "very", "much", "today", "now",
    "there", "here", "again", "well", "oh", "ah", "um",
    "в", "и", "а", "но", "же", "тоже", "очень", "сегодня", "сейчас",
    "ну", "вот", "тут", "ещё", "раз", "ка",
    "is", "am", "are", "do", "does", "did", "be", "been", "being",
    "man", "dude", "bro", "mate", "friend", "dear",
})


def _is_casual_message(text: str) -> bool:
    """Fast heuristic: obvious social/greeting chat that doesn't need planning.

    Conservative by design. Returns True only for short, tool-free social
    utterances (e.g. 'привет', 'как дела', 'спасибо'). Anything that looks
    like a command, URL, path, or multi-step request is treated as non-casual.
    """
    if not text:
        return True
    # Tool/action markers and URLs disqualify the message immediately.
    if any(marker in text for marker in ("@", "!", "http://", "https://", "file://")):
        return False
    # Path-like or command-like fragments.
    if any(fragment in text for fragment in ("/home", "/tmp", "/etc", "./", "../", "~/", "\\", ".py ", ".txt", ".md", ".json")):
        return False
    # A bare leading slash is a command/path indicator.
    if text.strip().startswith("/"):
        return False
    # Long messages are very unlikely to be pure social greetings.
    if len(text) > 100:
        return False
    stripped = text.strip()
    # Very short messages are treated as casual by default.
    if len(stripped) <= 10:
        return True
    words = [w for w in re.findall(r"\b\w+\b", stripped.lower()) if len(w) > 1]
    if not words:
        return True
    casual_count = sum(1 for w in words if w in _CASUAL_WORDS)
    return casual_count / len(words) >= 0.5


async def _todo_progress_message(session_id: str, *, first_iteration: bool = False) -> "Message | None":
    """Build a compact system reminder with current todo state and update discipline."""
    from navi.tools.todo import get_progress_message
    return await get_progress_message(session_id, first_iteration=first_iteration)


class Agent:
    def __init__(
        self,
        session_store: "SessionStore | None",
        profile_registry: ProfileRegistry,
        tool_registry: ToolRegistry,
        backend_registry: BackendRegistry,
        workers: list["Worker"] | None = None,
        memory_store: "MemoryStore | None" = None,
        cp_registry: "ContextProviderRegistry | None" = None,
        mcp_manager=None,
    ) -> None:
        self._sessions = session_store
        self._profiles = profile_registry
        self._tools = tool_registry
        self._backends = backend_registry
        self._workers: list["Worker"] = workers or []
        self._memory = memory_store
        self._cp_registry = cp_registry
        self._mcp_manager = mcp_manager
        self._ctx_builder = ContextBuilder(
            profile_registry=profile_registry,
            memory_store=memory_store,
            cp_registry=cp_registry,
            mcp_manager=mcp_manager,
        )
        self._tool_executor = ToolExecutor(tool_registry)
        self._planning = PlanningEngine(self._ctx_builder)
        self._compressor = ContextCompressor()
        self._subagent = SubAgentRunner(
            profile_registry=profile_registry,
            tool_registry=tool_registry,
            backend_registry=backend_registry,
            ctx_builder=self._ctx_builder,
            compressor=self._compressor,
            planning=self._planning,
            tool_executor=self._tool_executor,
            session_store=session_store,
            mcp_manager=mcp_manager,
        )

    def _set_active_profile(self, profile) -> None:
        """Update cached profile and propagate to subsystems that depend on it."""
        self._compressor.set_profile(profile)
        self._subagent.set_profile(profile)

    # ------------------------------------------------------------------
    # Public interface
    # ------------------------------------------------------------------

    async def run(self, session_id: str, user_message: str, images: list[str] | None = None, files: list[dict] | None = None, is_recall: bool = False) -> str:
        """Non-streaming: run the full tool-calling loop and return the final text."""
        full_content = ""
        async for event in self.run_stream(session_id, user_message, images=images, files=files, is_recall=is_recall):
            if isinstance(event, StreamEnd):
                full_content = event.full_content or ""
        return full_content

    async def run_ephemeral(
        self,
        user_message: str,
        profile_id: str,
        max_iterations: int = 40,
        exclude_tools: list[str] | None = None,
        briefing: str | None = None,
        custom_system_prompt: str | None = None,
        inherit_system_prompt: bool = False,
        context_transfer: str | None = None,
        parent_session_id: str | None = None,
        timeout_seconds: float = 300.0,
    ) -> tuple[str, bool]:
        """Delegate to SubAgentRunner."""
        return await self._subagent.run(
            user_message=user_message,
            profile_id=profile_id,
            max_iterations=max_iterations,
            exclude_tools=exclude_tools,
            briefing=briefing,
            custom_system_prompt=custom_system_prompt,
            inherit_system_prompt=inherit_system_prompt,
            context_transfer=context_transfer,
            parent_session_id=parent_session_id,
            timeout_seconds=timeout_seconds,
        )

    async def run_stream(
        self,
        session_id: str,
        user_message: str,
        images: list[str] | None = None,
        display_message: str | None = None,
        files: list[dict] | None = None,
        is_recall: bool = False,
    ) -> AsyncGenerator[AgentEvent, None]:
        """
        Streaming variant. Yields AgentEvent objects:
        - ThinkingDelta / ThinkingEnd: reasoning chunks
        - ToolEvent: tool call + result
        - TextDelta / StreamEnd: final streamed response
        - ContextCompressed: emitted by workers after compression

        Args:
            user_message: The text sent to the LLM (may contain injected hints).
            display_message: Optional text shown to the user in the chat UI.
                When omitted, user_message is used for both LLM context and display.
        """
        session = await self._sessions.get(session_id)
        if session is None:
            raise SessionNotFound(session_id)

        profile = self._profiles.get(session.profile_id)
        tools = self._tool_list(profile.get_agent_tools())
        tool_schemas = [t.schema() for t in tools]
        llm = self._get_backend(profile.llm_backend)

        mem = await self._ctx_builder._memory_msg(user_id=session.user_id)

        # Expose session_id and model to tools via ContextVar
        from navi.tools._internal.base import current_session_id as _sid_var, current_model as _model_var
        _sid_token = _sid_var.set(session_id)
        _model_var.set(profile.model)

        # Pre-turn compression: if the last turn filled the context past the
        # threshold, compress NOW before calling the LLM.  This prevents the
        # model from seeing an over-full context and generating gibberish
        # (e.g. a "summary of the conversation" instead of a real answer).
        async for _ev in self._compression_events_preturn(session, llm, profile, session_id):
            yield _ev

        display_text = display_message if display_message is not None else user_message
        user_msg_display = Message(role="user", content=display_text, images=images or None,
                                   files=files or None, created_at=datetime.now(timezone.utc),
                                   is_recall=is_recall, is_context=False)

        # Image token budgeting: fit as many images as possible into the LLM context.
        # Overflow images are saved to the session directory so Navi can view them
        # later via image_view if needed.
        images_for_context = images
        context_content = user_message
        if images:
            current_tokens = self._compressor.estimate_context_tokens(session.context)
            available_tokens = int(settings.ollama_num_ctx * 0.8) - current_tokens
            max_images = max(0, available_tokens // 500)
            if max_images < len(images):
                images_for_context = images[:max_images]
                overflow = images[max_images:]
                saved_names = []
                try:
                    session_dir = Path(settings.session_files_dir) / session_id
                    session_dir.mkdir(parents=True, exist_ok=True)
                    for idx, b64 in enumerate(overflow):
                        raw = base64.b64decode(b64)
                        img = Image.open(io.BytesIO(raw))
                        img = img.convert("RGB")
                        w, h = img.size
                        if w > 1024 or h > 1024:
                            ratio = 1024 / max(w, h)
                            img = img.resize((int(w * ratio), int(h * ratio)), Image.LANCZOS)
                        name = f"uploaded_{idx}.jpg"
                        path = session_dir / name
                        img.save(path, format="JPEG", quality=85, optimize=True)
                        saved_names.append(name)
                except Exception:
                    pass
                if saved_names:
                    context_content += (
                        f"\n\n[Additional images saved to session directory: {', '.join(saved_names)}]"
                    )

        user_msg_context = Message(role="user", content=context_content, images=images_for_context or None,
                                   files=files or None, created_at=datetime.now(timezone.utc),
                                   is_recall=is_recall, is_display=False)
        session.messages.append(user_msg_display)
        session.messages.append(user_msg_context)
        session.context.append(user_msg_context)
        # Persist user message immediately so it survives a client disconnect
        # before the assistant reply is ready.
        await self._sessions.save(session)

        stop_event = current_stop_event.get()
        turn_ctx = AgentTurnContext(turn_start=time.monotonic())

        # Planning phase — always runs on the first user message in a session;
        # on subsequent messages uses the profile's planning_enabled flag.
        # force_plan suppresses the DIRECT shortcut: first message is always forced,
        # and planning_mandatory extends that to every subsequent message.
        # Casual greetings are exempt from planning even on the first message.
        _is_first_message = sum(1 for m in session.messages if m.role == "user") == 1
        _is_casual = _is_casual_message(context_content) and not profile.planning_mandatory
        _force_plan = (_is_first_message and not _is_casual) or profile.planning_mandatory
        if (_is_first_message or profile.planning_enabled) and not _is_casual:
            log.debug("agent.planning_enter", session_id=session_id, first_message=_is_first_message, planning_enabled=profile.planning_enabled, force_plan=_force_plan)
            async for _ev in self._planning.run(session.context, profile, llm, mem, tool_schemas, messages=session.messages, force_plan=_force_plan):
                if isinstance(_ev, AIHelperTokensUsed):
                    turn_ctx.subagent_tokens += _ev.completion_tokens
                elif isinstance(_ev, PlanningDebugData):
                    session.planning_logs.append(_ev.log)
                    # Cap to prevent unbounded DB growth on long sessions
                    _MAX_PLANNING_LOGS = 20
                    if len(session.planning_logs) > _MAX_PLANNING_LOGS:
                        session.planning_logs = session.planning_logs[-_MAX_PLANNING_LOGS:]
                else:
                    yield _ev

        ctx_task = asyncio.create_task(self._ctx_builder._collect_context_injections(profile))
        mem_facts_task = asyncio.create_task(
            self._ctx_builder._memory_facts_msg(
                user_message, user_id=session.user_id, injected_ids=turn_ctx.injected_fact_ids
            )
        )
        ctx_injections = await ctx_task
        mem_facts = await mem_facts_task
        if mem_facts:
            ctx_injections.append(mem_facts)
            log.debug("agent.memory_facts_injected", session_id=session_id, facts_msg_length=len(mem_facts.content or ""))
        else:
            log.debug("agent.memory_facts_none", session_id=session_id)

        anti_stall = AntiStallMonitor(profile)
        if profile.anti_stall_enabled or profile.adaptive_replan_enabled:
            await anti_stall.init(session_id)

        # Tool-calling loop — uses stream_complete() for every turn so thinking
        # is captured in real-time via ThinkingDelta/ThinkingEnd events.
        for iteration in range(profile.max_iterations):
            # Cooperative stop: check before each LLM call
            if stop_event and stop_event.is_set():
                await self._sessions.save(session)
                yield StreamStopped()
                return

            async for _ev in self._compression_events_midturn(session, llm, profile, session_id, iteration, ctx_injections, mem):
                yield _ev

            state = StreamState()

            built_ctx = self._ctx_builder.build(session.context, profile, mem,
                                                  iteration=iteration, max_iterations=profile.max_iterations,
                                                  extra_system=ctx_injections,
                                                  session_id=session_id)

            if (
                profile.goal_anchoring_enabled
                and iteration > 0
                and iteration % profile.goal_anchoring_interval == 0
            ):
                built_ctx.append(await self._ctx_builder._build_goal_anchor(session_id, user_message))

            todo_msg = await _todo_progress_message(session_id, first_iteration=(iteration == 0))
            if todo_msg:
                built_ctx.append(todo_msg)

            # Anti-stall / adaptive re-plan: inject intervention message if needed
            _stall_msg = await anti_stall.pre_turn(session_id, iteration)
            if _stall_msg:
                built_ctx.append(_stall_msg)

            try:
                self._compressor.check_context_size(built_ctx)
            except ContextTooLargeError as e:
                # Surface the error as a Navi response (not a raw system error) so the
                # user sees a coherent message and the exchange is saved to history.
                error_text = str(e)
                assistant_msg = Message(role="assistant", content=error_text)
                session.context.append(assistant_msg)
                session.messages.append(assistant_msg)
                await self._sessions.save(session)
                yield TextDelta(delta=error_text)
                yield StreamEnd(content=error_text)
                return

            async for _ev in self._consume_stream(
                _iter_stream_guarded(
                    llm.stream_complete(
                        built_ctx,
                        tools=tool_schemas if tools else None,
                        temperature=profile.temperature,
                        model=profile.model,
                        think=profile.think_enabled,
                        top_k=profile.top_k,
                        top_p=profile.top_p,
                        num_thread=profile.num_thread,
                    ),
                    stop_event=stop_event,
                    first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
                    chunk_timeout=settings.llm_stream_chunk_timeout,
                ),
                stop_event,
                turn_ctx,
                state,
            ):
                yield _ev

            # Stopped mid-stream — save partial response and exit
            if stop_event and stop_event.is_set():
                if state.accumulated_text:
                    session.messages.append(Message(
                        role="assistant", content=state.accumulated_text,
                        created_at=datetime.now(timezone.utc),
                        is_context=False,
                    ))
                await self._sessions.save(session)
                yield StreamStopped()
                return

            turn_tool_calls = state.turn_tool_calls

            if not turn_tool_calls:
                # Final response — text already streamed above
                _elapsed = round(time.monotonic() - turn_ctx.turn_start, 1)
                # Net tokens = accumulated across all iterations + subagent tokens
                _net_tokens = turn_ctx.turn_tokens + turn_ctx.subagent_tokens
                assistant_msg = Message(
                    role="assistant",
                    content=state.accumulated_text or None,
                    thinking=state.accumulated_thinking or None,
                    created_at=datetime.now(timezone.utc),
                    elapsed_seconds=_elapsed,
                    tool_call_count=turn_ctx.tool_call_count if turn_ctx.tool_call_count else None,
                    token_count=_net_tokens if _net_tokens else None,
                )
                session.messages.append(assistant_msg)
                session.context.append(assistant_msg)
                session.context_token_count = state.context_tokens or 0
                await self._sessions.save(session)

                yield StreamEnd(
                    full_content=state.accumulated_text,
                    context_tokens=state.context_tokens,
                    max_context_tokens=settings.ollama_num_ctx,
                    elapsed_seconds=_elapsed,
                    tool_call_count=turn_ctx.tool_call_count,
                    token_count=_net_tokens if _net_tokens else None,
                )

                for event in await self._run_workers(session, llm, profile.model, state.context_tokens):
                    yield event
                return

            # Tool calls turn — record to session and execute
            assistant_msg = Message(
                role="assistant",
                content=state.accumulated_text or None,
                thinking=state.accumulated_thinking or None,
                tool_calls=turn_tool_calls,
            )
            session.messages.append(assistant_msg)
            session.context.append(assistant_msg)

            tool_ctx = ToolContext(
                session_id=session_id,
                event_sink=None,  # set per-tool inside _execute_tools_with_sink
                stop_event=stop_event,
                model=profile.model,
                user_id=session.user_id,
                user_role=current_user_role.get(),
                user_info=current_user_info.get(),
            )
            async for _ev in self._execute_tools_with_sink(turn_tool_calls, tools, turn_ctx, session, stop_event, tool_ctx):
                yield _ev

            # 6. Cooperative stop: check after tool execution before next LLM call
            if stop_event and stop_event.is_set():
                await self._sessions.save(session)
                yield StreamStopped()
                return

            # Update anti-stall counters and adaptive-replan state
            await anti_stall.post_turn(session_id, turn_tool_calls)

            # 7. If switch_profile was called this iteration, reload profile + tools.
            #    switch_profile updates the DB but run_stream() holds a local session
            #    object — without this check the final save would overwrite the change
            #    and the next LLM call would still use the old tool schemas.
            fresh = await self._sessions.get(session_id)
            if fresh and fresh.profile_id != session.profile_id:
                session.profile_id = fresh.profile_id
                profile = self._profiles.get(session.profile_id)
                self._set_active_profile(profile)
                tools = self._tool_list(profile.get_agent_tools())
                tool_schemas = [t.schema() for t in tools]
                llm = self._get_backend(profile.llm_backend)
                log.info(
                    "agent.profile_reloaded",
                    session_id=session_id,
                    new_profile=session.profile_id,
                )

        await self._sessions.save(session)
        raise MaxIterationsReached(profile.max_iterations)

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    async def _run_workers(
        self,
        session,
        llm: LLMBackend,
        model: str,
        context_tokens: int | None,
    ) -> list[AgentEvent]:
        """Run all workers sequentially; collect their events."""
        from navi.workers.base import WorkerContext

        ctx = WorkerContext(
            session_id=session.id,
            context_tokens=context_tokens,
            max_context_tokens=settings.ollama_num_ctx,
            llm=llm,
            model=model,
            temperature=settings.context_summary_temperature,
            session_store=self._sessions,
        )
        events: list[AgentEvent] = []
        for worker in self._workers:
            try:
                result = await worker.run(session, ctx)
                events.extend(result.events)
            except Exception:
                log.warning("agent.worker_failed",
                            worker=type(worker).__name__, exc_info=True)
        return events

    def _tool_list(
        self,
        scope: "ToolScopeConfig",
    ) -> list[Tool]:
        return build_tool_list(scope.native, scope.mcp, self._tools, self._mcp_manager)

    def _get_backend(self, backend_key: str) -> LLMBackend:
        return self._backends.get(backend_key)

    async def _compression_events_preturn(self, session, llm, profile, session_id):
        if (
            settings.context_compression_enabled
            and len(session.context) > 2
            and should_compress(
                self._compressor.estimate_context_tokens(session.context),
                settings.ollama_num_ctx,
                settings.context_compression_threshold,
            )
        ):
            yield CompressionStarted(
                context_tokens=self._compressor.estimate_context_tokens(session.context),
                max_context_tokens=settings.ollama_num_ctx,
            )
            event = await self._compressor.compress_and_save_session(
                session=session,
                session_store=self._sessions,
                llm=llm,
                model=profile.model,
                temperature=settings.context_summary_temperature,
                session_id=session_id,
                reason="preturn",
                keep_recent=settings.context_keep_recent,
                max_tokens=settings.context_summary_max_tokens,
            )
            if event:
                yield event

    async def _compression_events_midturn(self, session, llm, profile, session_id, iteration, ctx_injections, mem):
        if settings.context_compression_enabled and iteration > 0:
            preflight_ctx = self._ctx_builder.build(
                session.context,
                profile,
                mem,
                extra_system=ctx_injections,
                session_id=session_id,
            )
            estimated_tokens = self._compressor.estimate_context_tokens(preflight_ctx)
            if should_compress(
                estimated_tokens,
                settings.ollama_num_ctx,
                settings.context_compression_threshold,
            ):
                yield CompressionStarted(
                    context_tokens=estimated_tokens,
                    max_context_tokens=settings.ollama_num_ctx,
                )
                event = await self._compressor.compress_and_save_session(
                    session=session,
                    session_store=self._sessions,
                    llm=llm,
                    model=profile.model,
                    temperature=settings.context_summary_temperature,
                    session_id=session_id,
                    reason="midturn",
                    keep_recent=settings.context_keep_recent,
                    max_tokens=settings.context_summary_max_tokens,
                    keep_recent_messages=max(12, settings.context_keep_recent * 2),
                )
                if event:
                    yield event

    async def _consume_stream(self, stream_gen, stop_event, turn_ctx: AgentTurnContext, state: StreamState):
        async for chunk in stream_gen:
            if stop_event and stop_event.is_set():
                if state.thinking_active:
                    yield ThinkingEnd()
                break
            if chunk.prompt_tokens is not None:
                state.context_tokens = chunk.prompt_tokens
            if chunk.completion_tokens is not None:
                turn_ctx.turn_tokens += chunk.completion_tokens
            if chunk.thinking:
                state.accumulated_thinking += chunk.thinking
                if not state.thinking_active:
                    state.thinking_active = True
                yield ThinkingDelta(delta=chunk.thinking)
            if chunk.delta:
                if state.thinking_active:
                    state.thinking_active = False
                    yield ThinkingEnd()
                state.accumulated_text += chunk.delta
                yield TextDelta(delta=chunk.delta)
            if chunk.tool_calls:
                state.turn_tool_calls = chunk.tool_calls
            if chunk.finish_reason and state.thinking_active:
                state.thinking_active = False
                yield ThinkingEnd()

    async def _execute_tools_with_sink(self, turn_tool_calls, tools, turn_ctx: AgentTurnContext, session, stop_event, tool_ctx=None):
        """Execute tool calls with cooperative stop support.

        Polls *stop_event* every second while draining the event sink so the
        Stop button works even during long-running tools (terminal, SSH,
        web search, sub-agent spawn).
        """
        tool_map = {t.name: t for t in tools}
        for tc in turn_tool_calls:
            yield ToolStarted(tool_name=tc.name, arguments=tc.arguments, tool_call_id=tc.id)

            sink: asyncio.Queue = asyncio.Queue()
            sink_token = current_event_sink.set(sink)
            result_holder: list = []

            async def _run_with_sentinel(_tc=tc, _holder=result_holder, _sink=sink):
                try:
                    _holder.append(await self._tool_executor._run_single_tool(_tc, tool_map, ctx=tool_ctx))
                except Exception as exc:
                    _holder.append(exc)
                finally:
                    await _sink.put(_TOOL_DONE)

            tool_task = asyncio.create_task(_run_with_sentinel())
            current_event_sink.reset(sink_token)

            try:
                # Poll sink with 1s timeout so stop_event is checked during long tools
                stopped = False
                while True:
                    try:
                        item = await asyncio.wait_for(sink.get(), timeout=1.0)
                    except asyncio.TimeoutError:
                        if stop_event and stop_event.is_set():
                            stopped = True
                            if not tool_task.done():
                                tool_task.cancel()
                            break
                        continue

                    if item is _TOOL_DONE:
                        break
                    if isinstance(item, SubagentComplete):
                        turn_ctx.subagent_tokens += item.token_count
                        turn_ctx.tool_call_count += item.tool_call_count
                    elif isinstance(item, AIHelperTokensUsed):
                        turn_ctx.subagent_tokens += item.completion_tokens
                    else:
                        yield item

                if stopped:
                    # Tool was interrupted by user — record a synthetic cancellation result
                    log.info("agent.tool_stopped", tool=tc.name)
                    yield ToolEvent(
                        tool_name=tc.name, arguments=tc.arguments,
                        result="Tool execution was stopped by the user.", success=False,
                        tool_call_id=tc.id,
                    )
                    session.messages.append(Message(
                        role="tool", content="Tool execution was stopped by the user.",
                        tool_call_id=tc.id, name=tc.name, metadata={},
                        is_context=False,
                    ))
                    await self._sessions.save(session)
                    return

                r = result_holder[0] if result_holder else RuntimeError("tool task produced no result")
                if isinstance(r, Exception):
                    if isinstance(r, (LLMBackendError, LLMConnectionError)):
                        raise r
                    log.warning("agent.tool_exception", tool=tc.name, error=str(r))
                    tool_event = ToolEvent(
                        tool_name=tc.name, arguments=tc.arguments,
                        result=f"Error: {r}", success=False,
                        tool_call_id=tc.id,
                    )
                    msg = Message(role="tool", content=f"Error: {r}", tool_call_id=tc.id, name=tc.name, metadata={})
                    image_msg = None
                else:
                    tool_event, msg, image_msg = r

                turn_ctx.tool_call_count += 1
                yield tool_event
                session.messages.append(msg)
                session.context.append(msg)
                if image_msg:
                    session.messages.append(image_msg)
                    session.context.append(image_msg)
            finally:
                if not tool_task.done():
                    tool_task.cancel()
                try:
                    await tool_task
                except Exception:
                    pass