diff --git a/navi/core/tool_executor.py b/navi/core/tool_executor.py index b36f827..3500ba0 100644 --- a/navi/core/tool_executor.py +++ b/navi/core/tool_executor.py @@ -38,7 +38,7 @@ if len(normalized_matches) == 1: return normalized_matches[0] - # Fallback: old underscore format like mcp_server_tool → mcp:server:tool + # Fallback: old underscore format like mcp_server_tool -> mcp:server:tool old_format_matches = [ (candidate_name, candidate) for candidate_name, candidate in tool_map.items() @@ -57,15 +57,15 @@ def __init__(self, tool_registry) -> None: self._tools = tool_registry - async def _run_single_tool( + async def _execute_one( self, tc: ToolCallRequest, tool_map: dict[str, Tool], ) -> tuple["ToolEvent", Message, "Message | None"]: - """Execute one tool call and return (ToolEvent, tool_msg, optional_image_msg). + """Execute a single tool call and return (ToolEvent, tool_msg, optional_image_msg). - Called via asyncio.create_task() from run_stream() so that the parent - generator can drain the event sink queue concurrently. + This is the single canonical path for tool resolution, middleware, + execution, and message construction. All public batch methods delegate here. """ from navi.core.events import ToolEvent @@ -101,86 +101,30 @@ name=resolved_name if tool is not None else tc.name, metadata=metadata) return event, msg, image_msg + async def _run_single_tool( + self, + tc: ToolCallRequest, + tool_map: dict[str, Tool], + ) -> tuple["ToolEvent", Message, "Message | None"]: + """Execute one tool call and return (ToolEvent, tool_msg, optional_image_msg). + + Called via asyncio.create_task() from run_stream() so that the parent + generator can drain the event sink queue concurrently. + """ + return await self._execute_one(tc, tool_map) + async def _execute_tool_calls( self, tool_calls: list[ToolCallRequest], tools: list[Tool] ) -> tuple[list[Message], list[Message]]: tool_map = {t.name: t for t in tools} - - async def _run_one(tc: ToolCallRequest) -> tuple[Message, Message | None]: - resolved_name, tool = _resolve_tool(tool_map, tc.name) - image_msg = None - metadata: dict = {} - if tool is None: - content = f"Error: tool '{tc.name}' not found." - else: - log.info("tool.execute", tool=resolved_name, requested_tool=tc.name, args=tc.arguments) - middlewares = getattr(self._tools, "_middlewares", []) - for mw in middlewares: - await mw.before_execute(resolved_name, tc.arguments) - result = await tool.execute(tc.arguments) - for mw in middlewares: - await mw.after_execute(resolved_name, tc.arguments, result) - content = result.to_message_content() - metadata = result.metadata or {} - if result.success and result.metadata and result.metadata.get("is_image"): - b64 = result.metadata.get("base64") - if b64: - image_msg = Message( - role="user", - content=f"[Image loaded via {resolved_name} — analyse it]", - images=[b64], - ) - tool_msg = Message(role="tool", content=content, tool_call_id=tc.id, name=resolved_name if tool is not None else tc.name, metadata=metadata) - return tool_msg, image_msg - - pairs = await asyncio.gather(*[_run_one(tc) for tc in tool_calls]) - tool_msgs = [p[0] for p in pairs] - image_msgs = [p[1] for p in pairs if p[1] is not None] + pairs = await asyncio.gather(*[self._execute_one(tc, tool_map) for tc in tool_calls]) + tool_msgs = [p[1] for p in pairs] + image_msgs = [p[2] for p in pairs if p[2] is not None] return tool_msgs, image_msgs async def _execute_tool_calls_streaming( self, tool_calls: list[ToolCallRequest], tools: list[Tool] ) -> tuple[list[tuple["ToolEvent", Message]], list[Message]]: - from navi.core.events import ToolEvent - tool_map = {t.name: t for t in tools} - middlewares = getattr(self._tools, "_middlewares", []) - - async def _run_one(tc: ToolCallRequest) -> tuple[ToolEvent, Message, Message | None]: - resolved_name, tool = _resolve_tool(tool_map, tc.name) - image_msg = None - metadata: dict = {} - if tool is None: - content = f"Error: tool '{tc.name}' not found." - event = ToolEvent( - tool_name=tc.name, arguments=tc.arguments, result=content, success=False - ) - else: - log.info("tool.execute", tool=resolved_name, requested_tool=tc.name, args=tc.arguments) - for mw in middlewares: - await mw.before_execute(resolved_name, tc.arguments) - result = await tool.execute(tc.arguments) - for mw in middlewares: - await mw.after_execute(resolved_name, tc.arguments, result) - content = result.to_message_content() - metadata = result.metadata or {} - event = ToolEvent( - tool_name=resolved_name, - arguments=tc.arguments, - result=content, - success=result.success, - metadata=metadata, - ) - if result.success and result.metadata and result.metadata.get("is_image"): - b64 = result.metadata.get("base64") - if b64: - image_msg = Message( - role="user", - content=f"[Image loaded via {resolved_name} — analyse it]", - images=[b64], - ) - msg = Message(role="tool", content=content, tool_call_id=tc.id, name=resolved_name if tool is not None else tc.name, metadata=metadata) - return event, msg, image_msg - - triples = await asyncio.gather(*[_run_one(tc) for tc in tool_calls]) + triples = await asyncio.gather(*[self._execute_one(tc, tool_map) for tc in tool_calls]) return [(t[0], t[1]) for t in triples], [t[2] for t in triples if t[2] is not None]