diff --git a/navi/api/routes/admin.py b/navi/api/routes/admin.py index dbac8cd..63001eb 100644 --- a/navi/api/routes/admin.py +++ b/navi/api/routes/admin.py @@ -395,3 +395,242 @@ log.info("admin.profile_updated", profile_id=profile_id, admin_id=user.id) return {"ok": True} + + +# ── MCP administration ────────────────────────────────────────────────────── + + +@router.get("/mcp/config") +async def admin_get_mcp_config( + user: Annotated[User, Depends(require_admin)], +) -> dict: + """Return the current mcp_servers.json configuration.""" + from navi.mcp.config import load_mcp_servers + + configs = load_mcp_servers() + return {name: cfg.model_dump() for name, cfg in configs.items()} + + +@router.put("/mcp/config") +async def admin_update_mcp_config( + body: dict, + user: Annotated[User, Depends(require_admin)], +) -> dict: + """Replace mcp_servers.json with the provided configuration.""" + from navi.mcp.config import McpServerConfig, save_mcp_servers + + validated: dict[str, McpServerConfig] = {} + for name, cfg_data in body.items(): + try: + validated[name] = McpServerConfig.model_validate(cfg_data) + except Exception as exc: + raise HTTPException( + status_code=400, + detail=f"Invalid config for server '{name}': {exc}", + ) from exc + + save_mcp_servers(validated) + log.info("admin.mcp_config_updated", admin_id=user.id) + return {"ok": True} + + +@router.post("/mcp/{server_name}/reconnect") +async def admin_reconnect_mcp_server( + server_name: str, + user: Annotated[User, Depends(require_admin)], +) -> dict: + """Disconnect and reconnect a single MCP server, then re-register its tools.""" + from navi.api.deps import _mcp_manager, get_tool_registry + from navi.mcp.client import McpClient + from navi.mcp.config import load_mcp_servers + from navi.mcp.tools import McpTool + + if _mcp_manager is None: + raise HTTPException(status_code=503, detail="MCP manager not initialized") + + configs = load_mcp_servers(_mcp_manager.config_path) + cfg = configs.get(server_name) + if cfg is None: + raise HTTPException( + status_code=404, detail=f"MCP server '{server_name}' not found in config" + ) + + # Drop old client + old_client = _mcp_manager.clients.pop(server_name, None) + if old_client: + try: + await old_client.disconnect() + except Exception: + pass + + # Unregister old tools for this server + tool_registry = get_tool_registry() + for name in list(tool_registry._external_names): + if name.startswith(f"mcp:{server_name}:"): + tool_registry.unregister_external(name) + + # Connect fresh + client = McpClient(server_name, cfg) + try: + await client.connect() + _mcp_manager.clients[server_name] = client + except Exception as exc: + log.warning("admin.mcp_reconnect_failed", server=server_name, error=str(exc)) + raise HTTPException( + status_code=502, detail=f"Reconnect failed: {exc}" + ) from exc + + # Re-register tools + try: + tools = await client.list_tools() + for tool in tools: + mcp_tool = McpTool( + server_name=server_name, + tool_name=tool.name, + description=tool.description or "", + parameters=tool.inputSchema, + manager=_mcp_manager, + ) + tool_registry.register_external(mcp_tool) + except Exception as exc: + log.warning( + "admin.mcp_list_tools_failed_after_reconnect", + server=server_name, + error=str(exc), + ) + + log.info("admin.mcp_reconnected", server=server_name, admin_id=user.id) + return {"ok": True} + + +@router.get("/mcp/status") +async def admin_get_mcp_status( + user: Annotated[User, Depends(require_admin)], +) -> dict: + """Return connection status and tool counts for every configured MCP server.""" + from navi.api.deps import _mcp_manager + from navi.mcp.config import load_mcp_servers + + configs = load_mcp_servers() + manager = _mcp_manager + + servers = [] + for name, cfg in configs.items(): + client = manager.clients.get(name) if manager else None + status: dict = { + "name": name, + "transport": cfg.transport, + "connected": client.connected if client else False, + "tool_count": 0, + "instructions": None, + "error": None, + } + if client and client.connected: + status["instructions"] = client.instructions + try: + tools = await client.list_tools() + status["tool_count"] = len(tools) + except Exception as exc: + status["error"] = str(exc) + servers.append(status) + + return {"servers": servers} + + +@router.post("/mcp/test") +async def admin_test_mcp_tool( + body: dict, + user: Annotated[User, Depends(require_admin)], +) -> dict: + """Execute a single MCP tool call for testing/diagnostics.""" + from navi.api.deps import _mcp_manager + + server_name = body.get("server_name") + tool_name = body.get("tool_name") + arguments = body.get("arguments", {}) + + if not server_name or not tool_name: + raise HTTPException( + status_code=400, detail="server_name and tool_name are required" + ) + + if _mcp_manager is None: + raise HTTPException(status_code=503, detail="MCP manager not initialized") + + try: + output, is_error = await _mcp_manager.call_tool( + server_name, tool_name, arguments + ) + except Exception as exc: + raise HTTPException( + status_code=502, detail=f"Tool call failed: {exc}" + ) from exc + + return { + "server_name": server_name, + "tool_name": tool_name, + "arguments": arguments, + "output": output, + "is_error": is_error, + } + + +@router.get("/profiles/{profile_id}/mcp") +async def admin_get_profile_mcp( + profile_id: str, + user: Annotated[User, Depends(require_permission("navi.profiles.manage"))], +) -> dict: + """Return the MCP group mapping for a profile.""" + from navi.api.deps import get_profile_registry + + try: + profile = get_profile_registry().get(profile_id) + except Exception: + raise HTTPException(status_code=404, detail="Profile not found") + + return {"profile_id": profile_id, "mcp_servers": profile.mcp_servers} + + +@router.put("/profiles/{profile_id}/mcp") +async def admin_update_profile_mcp( + profile_id: str, + body: dict, + user: Annotated[User, Depends(require_permission("navi.profiles.manage"))], +) -> dict: + """Update the MCP group mapping for a profile and persist to disk.""" + from pathlib import Path + + from navi.api.deps import get_profile_registry + from navi.profiles.loader import save_profile_to_dir + + try: + profile = get_profile_registry().get(profile_id) + except Exception: + raise HTTPException(status_code=404, detail="Profile not found") + + mcp_servers = body.get("mcp_servers") + if mcp_servers is None: + raise HTTPException(status_code=400, detail="mcp_servers is required") + if not isinstance(mcp_servers, dict): + raise HTTPException(status_code=400, detail="mcp_servers must be an object") + + for srv_name, groups in mcp_servers.items(): + if not isinstance(groups, list): + raise HTTPException( + status_code=400, + detail=f"Groups for server '{srv_name}' must be a list", + ) + if not all(isinstance(g, str) for g in groups): + raise HTTPException( + status_code=400, + detail=f"Groups for server '{srv_name}' must be strings", + ) + + profile.mcp_servers = mcp_servers + + profiles_dir = Path(__file__).parent.parent.parent / "profiles" + save_profile_to_dir(profile, profiles_dir) + get_profile_registry().update(profile) + + log.info("admin.profile_mcp_updated", profile_id=profile_id, admin_id=user.id) + return {"ok": True} diff --git a/navi/mcp/config.py b/navi/mcp/config.py index 239ab73..c9ed8a5 100644 --- a/navi/mcp/config.py +++ b/navi/mcp/config.py @@ -55,3 +55,17 @@ raw = json.loads(path.read_text(encoding="utf-8")) return {name: McpServerConfig.model_validate(cfg) for name, cfg in raw.items()} + + +def save_mcp_servers(configs: dict[str, McpServerConfig], path: str | Path | None = None) -> None: + """Write MCP server configurations to a JSON file. + + Default path is ``mcp_servers.json`` in the current working directory. + """ + if path is None: + path = Path("mcp_servers.json") + else: + path = Path(path) + + raw = {name: cfg.model_dump() for name, cfg in configs.items()} + path.write_text(json.dumps(raw, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")