from __future__ import annotations
from typing import Any
from navi.tools.base import Tool, ToolResult
from .manager import McpManager
class McpTool(Tool):
"""A :class:`Tool` proxy that forwards execution to an MCP server.
The name is ``mcp_<server>_<tool>`` to avoid collisions with built-in
and user-defined tools.
"""
def __init__(
self,
server_name: str,
tool_name: str,
description: str,
parameters: dict[str, Any],
manager: McpManager,
) -> None:
self.server_name = server_name
self.tool_name = tool_name
self.description = description
self.parameters = parameters
self._manager = manager
self.name = f"mcp_{server_name}_{tool_name}"
async def execute(self, params: dict[str, Any]) -> ToolResult:
try:
output, is_error = await self._manager.call_tool(
self.server_name, self.tool_name, params
)
if is_error:
return ToolResult(
success=False,
output=output,
error="MCP tool reported an error",
)
return ToolResult(success=True, output=output)
except Exception as exc:
return ToolResult(success=False, output="", error=str(exc))