diff --git a/navi/core/agent.py b/navi/core/agent.py index 7592b36..0c3b966 100644 --- a/navi/core/agent.py +++ b/navi/core/agent.py @@ -379,7 +379,7 @@ session.messages.append(assistant_msg) session.context.append(assistant_msg) - async for _ev in self._execute_tools_with_sink(turn_tool_calls, tools, turn_ctx, session): + async for _ev in self._execute_tools_with_sink(turn_tool_calls, tools, turn_ctx, session, stop_event): yield _ev # 6. Cooperative stop: check after tool execution before next LLM call @@ -583,7 +583,13 @@ state.thinking_active = False yield ThinkingEnd() - async def _execute_tools_with_sink(self, turn_tool_calls, tools, turn_ctx: AgentTurnContext, session): + async def _execute_tools_with_sink(self, turn_tool_calls, tools, turn_ctx: AgentTurnContext, session, stop_event): + """Execute tool calls with cooperative stop support. + + Polls *stop_event* every second while draining the event sink so the + Stop button works even during long-running tools (terminal, SSH, + web search, sub-agent spawn). + """ tool_map = {t.name: t for t in tools} for tc in turn_tool_calls: yield ToolStarted(tool_name=tc.name, arguments=tc.arguments) @@ -604,18 +610,44 @@ current_event_sink.reset(sink_token) try: + # Poll sink with 1s timeout so stop_event is checked during long tools + stopped = False while True: - item = await sink.get() + try: + item = await asyncio.wait_for(sink.get(), timeout=1.0) + except asyncio.TimeoutError: + if stop_event and stop_event.is_set(): + stopped = True + if not tool_task.done(): + tool_task.cancel() + break + continue + if item is _TOOL_DONE: break if isinstance(item, SubagentComplete): turn_ctx.subagent_tokens += item.token_count turn_ctx.tool_call_count += item.tool_call_count elif isinstance(item, AIHelperTokensUsed): - turn_ctx.subagent_tokens += item.total + turn_ctx.subagent_tokens += item.completion_tokens else: yield item + if stopped: + # Tool was interrupted by user — record a synthetic cancellation result + log.info("agent.tool_stopped", tool=tc.name) + yield ToolEvent( + tool_name=tc.name, arguments=tc.arguments, + result="Tool execution was stopped by the user.", success=False, + ) + session.messages.append(Message( + role="tool", content="Tool execution was stopped by the user.", + tool_call_id=tc.id, name=tc.name, metadata={}, + )) + await self._sessions.save(session) + yield StreamStopped() + return + r = result_holder[0] if result_holder else RuntimeError("tool task produced no result") if isinstance(r, Exception): if isinstance(r, (LLMBackendError, LLMConnectionError)): diff --git a/navi/core/container.py b/navi/core/container.py index 31c695d..c98f7e4 100644 --- a/navi/core/container.py +++ b/navi/core/container.py @@ -60,12 +60,12 @@ if self.mcp_manager is not None: try: await self.mcp_manager.disconnect_all() - except Exception: + except BaseException: pass if self.database is not None: try: await self.database.close() - except Exception: + except BaseException: pass diff --git a/navi/core/subagent_runner.py b/navi/core/subagent_runner.py index 6da19e2..f269bea 100644 --- a/navi/core/subagent_runner.py +++ b/navi/core/subagent_runner.py @@ -349,6 +349,12 @@ ) for tc in turn_tool_calls: + # Cooperative stop: if the user clicked Stop before this tool, + # skip remaining tools in this batch. + if stop_event and stop_event.is_set(): + log.info("agent.subagent.tool_batch_stopped", tool=tc.name) + return "", False + _sub_tool_count += 1 if sink is not None: await sink.put( diff --git a/navi/mcp/manager.py b/navi/mcp/manager.py index 2379e7a..4b4ce32 100644 --- a/navi/mcp/manager.py +++ b/navi/mcp/manager.py @@ -68,17 +68,13 @@ """Close every open connection and clear the client pool.""" if not self._clients: return - try: - results = await asyncio.gather( - *[c.disconnect() for c in self._clients.values()], - return_exceptions=True, - ) - for name, exc in zip(self._clients, results): - if isinstance(exc, Exception): - logger.warning("MCP server %r disconnect error: %s", name, exc) - except (asyncio.CancelledError, RuntimeError): - # Shutdown-time cancellation from anyio task scopes — safe to ignore - pass + for name, client in list(self._clients.items()): + try: + await client.disconnect() + except asyncio.CancelledError: + pass + except Exception as exc: + logger.warning("MCP server %r disconnect error: %s", name, exc) self._clients.clear() async def get_all_tools(self) -> list[tuple[str, Any]]: