diff --git a/navi/api/routes/admin.py b/navi/api/routes/admin.py index 63001eb..55a63e7 100644 --- a/navi/api/routes/admin.py +++ b/navi/api/routes/admin.py @@ -404,7 +404,7 @@ async def admin_get_mcp_config( user: Annotated[User, Depends(require_admin)], ) -> dict: - """Return the current mcp_servers.json configuration.""" + """Return the current MCP server configurations from mcp_servers.d/.""" from navi.mcp.config import load_mcp_servers configs = load_mcp_servers() @@ -416,7 +416,7 @@ body: dict, user: Annotated[User, Depends(require_admin)], ) -> dict: - """Replace mcp_servers.json with the provided configuration.""" + """Replace all MCP server configurations (bulk update).""" from navi.mcp.config import McpServerConfig, save_mcp_servers validated: dict[str, McpServerConfig] = {} @@ -434,6 +434,62 @@ return {"ok": True} +@router.get("/mcp/config/{server_name}") +async def admin_get_single_mcp_config( + server_name: str, + user: Annotated[User, Depends(require_admin)], +) -> dict: + """Return the configuration for a single MCP server.""" + from navi.mcp.config import load_mcp_servers + + configs = load_mcp_servers() + cfg = configs.get(server_name) + if cfg is None: + raise HTTPException(status_code=404, detail=f"MCP server '{server_name}' not found") + return cfg.model_dump() + + +@router.put("/mcp/config/{server_name}") +async def admin_update_single_mcp_config( + server_name: str, + body: dict, + user: Annotated[User, Depends(require_admin)], +) -> dict: + """Create or update a single MCP server configuration.""" + from navi.mcp.config import McpServerConfig, load_mcp_servers, save_mcp_servers + + try: + validated = McpServerConfig.model_validate(body) + except Exception as exc: + raise HTTPException( + status_code=400, + detail=f"Invalid config for server '{server_name}': {exc}", + ) from exc + + configs = load_mcp_servers() + configs[server_name] = validated + save_mcp_servers(configs) + log.info("admin.mcp_config_updated_single", server=server_name, admin_id=user.id) + return {"ok": True} + + +@router.delete("/mcp/config/{server_name}") +async def admin_delete_single_mcp_config( + server_name: str, + user: Annotated[User, Depends(require_admin)], +) -> dict: + """Remove a single MCP server configuration.""" + from navi.mcp.config import load_mcp_servers, save_mcp_servers + + configs = load_mcp_servers() + if server_name not in configs: + raise HTTPException(status_code=404, detail=f"MCP server '{server_name}' not found") + del configs[server_name] + save_mcp_servers(configs) + log.info("admin.mcp_config_deleted_single", server=server_name, admin_id=user.id) + return {"ok": True} + + @router.post("/mcp/{server_name}/reconnect") async def admin_reconnect_mcp_server( server_name: str,