Newer
Older
navi-1 / navi / api / routes / admin.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy on 25 May 24 KB Fix 19 issues found in full codebase review
"""Admin endpoints for multi-user Navi management."""

import re
from typing import Annotated

import structlog
from fastapi import APIRouter, Depends, HTTPException

from navi.api.deps import (
    get_scheduler,
    get_session_store,
    require_admin,
    require_permission,
)
from navi.auth import User
from navi.config import settings
from navi.core import SessionStore
from navi.core.scheduler import RecallScheduler

log = structlog.get_logger()
router = APIRouter(prefix="/admin", tags=["admin"])

_MCP_NAME_RE = re.compile(r"^[\w_-]+$")


def _validate_mcp_name(name: str) -> None:
    if not _MCP_NAME_RE.match(name):
        raise HTTPException(
            status_code=400,
            detail="Invalid server name. Use only letters, digits, underscores, and hyphens.",
        )


@router.get("/sessions")
async def admin_list_sessions(
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
    limit: int = 50,
    offset: int = 0,
    search: str | None = None,
    sort_by: str = "last_active",
    sort_order: str = "desc",
):
    """Return all sessions across all users with pagination, search and sorting."""
    sessions = await store.search_list(
        limit=limit,
        offset=offset,
        user_id=user.id,
        is_admin=True,
        search=search or None,
        sort_by=sort_by,
        sort_order=sort_order,
    )
    total = await store.count_all(
        user_id=user.id, is_admin=True, search=search or None
    )
    return {
        "total": total,
        "limit": limit,
        "offset": offset,
        "items": [
            {
                "session_id": s.id,
                "profile_id": s.profile_id,
                "user_id": s.user_id,
                "name": s.name,
                "message_count": len(s.messages),
                "pinned": s.pinned,
                "created_at": s.created_at.isoformat(),
                "last_active": s.last_active.isoformat(),
            }
            for s in sessions
        ],
    }


@router.get("/sessions/{session_id}")
async def admin_get_session(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    """Return full session details including messages."""
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    return {
        "session_id": session.id,
        "profile_id": session.profile_id,
        "user_id": session.user_id,
        "name": session.name,
        "messages": [m.model_dump(mode="json", exclude_none=True) for m in session.messages],
        "context_token_count": session.context_token_count,
        "max_context_tokens": settings.ollama_num_ctx,
        "pinned": session.pinned,
        "created_at": session.created_at.isoformat(),
        "last_active": session.last_active.isoformat(),
    }


@router.delete("/sessions/{session_id}", status_code=204)
async def admin_delete_session(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
) -> None:
    """Delete any session (bypass ownership)."""
    from navi.session_files import delete_session_dir

    deleted = await store.delete(session_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="Session not found")
    await delete_session_dir(session_id)


@router.get("/users")
async def admin_list_users(
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
):
    """Return all registered navi_users."""
    pool = await store._get_pool()
    async with pool.acquire() as conn:
        rows = await conn.fetch(
            "SELECT id, email, display_name, role, permissions, created_at, updated_at FROM navi_users ORDER BY created_at DESC"
        )
    return [
        {
            "id": r["id"],
            "email": r["email"],
            "display_name": r["display_name"],
            "role": r["role"],
            "permissions": r["permissions"],
            "created_at": r["created_at"].isoformat(),
            "updated_at": r["updated_at"].isoformat(),
        }
        for r in rows
    ]


@router.get("/users/{user_id}")
async def admin_get_user(
    user_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    """Return single user details."""
    pool = await store._get_pool()
    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT id, email, display_name, role, permissions, created_at, updated_at FROM navi_users WHERE id = $1",
            user_id,
        )
    if row is None:
        raise HTTPException(status_code=404, detail="User not found")
    return {
        "id": row["id"],
        "email": row["email"],
        "display_name": row["display_name"],
        "role": row["role"],
        "permissions": row["permissions"],
        "created_at": row["created_at"].isoformat(),
        "updated_at": row["updated_at"].isoformat(),
    }


@router.get("/users/{user_id}/sessions")
async def admin_get_user_sessions(
    user_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
):
    """Return sessions owned by a specific user."""
    sessions = await store.list_all(user_id=user.id, is_admin=True)
    user_sessions = [s for s in sessions if s.user_id == user_id]
    return [
        {
            "session_id": s.id,
            "profile_id": s.profile_id,
            "name": s.name,
            "message_count": len(s.messages),
            "pinned": s.pinned,
            "created_at": s.created_at.isoformat(),
            "last_active": s.last_active.isoformat(),
        }
        for s in user_sessions
    ]


@router.get("/memory")
async def admin_list_memory(
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_permission("navi.memory.read_all"))],
    limit: int = 50,
    offset: int = 0,
    search: str | None = None,
    sort_by: str = "updated_at",
    sort_order: str = "desc",
    user_id: str | None = None,
):
    """Return memory facts with pagination, search and sorting."""
    from navi.api.deps import get_memory_store

    memory = get_memory_store()
    facts = await memory.get_all_facts(
        limit=limit,
        offset=offset,
        search=search or None,
        sort_by=sort_by,
        sort_order=sort_order,
        user_id=user_id,
        all_users=user_id is None,
    )
    total = await memory.fact_count(
        user_id=user_id,
        all_users=user_id is None,
        search=search or None,
    )
    return {"total": total, "limit": limit, "offset": offset, "items": facts}


@router.patch("/users/{user_id}/role")
async def admin_update_user_role(
    user_id: str,
    body: dict,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
):
    """Update a user's cached role (requires admin)."""
    role = body.get("role")
    if role not in ("user", "admin"):
        raise HTTPException(status_code=400, detail="Invalid role")
    pool = await store._get_pool()
    async with pool.acquire() as conn:
        await conn.execute(
            "UPDATE navi_users SET role = $1, updated_at = $2 WHERE id = $3",
            role,
            __import__("datetime").datetime.now(__import__("datetime").timezone.utc),
            user_id,
        )
    log.info("admin.role_updated", target_user_id=user_id, role=role, admin_id=user.id)
    return {"ok": True}


@router.post("/ollama/clear-blacklists", status_code=204)
async def admin_clear_ollama_blacklists(
    user: Annotated[User, Depends(require_admin)],
) -> None:
    """Clear dead-server and dead-model blacklists for the Ollama fallback backend."""
    from navi.llm.fallback import clear_blacklists

    clear_blacklists()
    log.info("admin.ollama_blacklists_cleared", admin_id=user.id)


@router.get("/profiles")
async def admin_list_profiles(
    user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
):
    """Return all profiles including admin-only ones."""
    from navi.api.deps import get_profile_registry

    profiles = get_profile_registry()
    return [
        {
            "id": p.id,
            "name": p.name,
            "description": p.description,
            "is_admin_only": getattr(p, "is_admin_only", False),
        }
        for p in profiles.all()
    ]


@router.patch("/profiles/{profile_id}/availability")
async def admin_update_profile_availability(
    profile_id: str,
    body: dict,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
):
    """Toggle admin-only visibility for a profile and persist to DB."""
    is_admin_only = body.get("is_admin_only")
    if not isinstance(is_admin_only, bool):
        raise HTTPException(status_code=400, detail="is_admin_only must be a boolean")

    from navi.api.deps import get_profile_registry
    from navi.profiles._overrides import save_override

    pool = await store._get_pool()
    await save_override(pool, profile_id, is_admin_only)

    # Mutate the in-memory profile so the change is effective immediately
    # without a server restart.
    try:
        profile = get_profile_registry().get(profile_id)
        profile.is_admin_only = is_admin_only
    except Exception:
        pass  # profile may not be loaded; DB value is the source of truth anyway

    log.info(
        "admin.profile_availability",
        profile_id=profile_id,
        is_admin_only=is_admin_only,
        admin_id=user.id,
    )
    return {"ok": True}


@router.get("/profiles/{profile_id}")
async def admin_get_profile(
    profile_id: str,
    user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
):
    """Return full profile configuration including system prompt."""
    from navi.api.deps import get_profile_registry

    try:
        profile = get_profile_registry().get(profile_id)
    except Exception:
        raise HTTPException(status_code=404, detail="Profile not found")

    return {
        "id": profile.id,
        "name": profile.name,
        "description": profile.description,
        "short_description": profile.short_description,
        "full_description": profile.full_description,
        "system_prompt": profile.system_prompt,
        "subagent_system_prompt": profile.subagent_system_prompt,
        "llm_backend": profile.llm_backend,
        "model": profile.model,
        "temperature": profile.temperature,
        "top_k": profile.top_k,
        "top_p": profile.top_p,
        "num_thread": profile.num_thread,
        "max_iterations": profile.max_iterations,
        "planning_enabled": profile.planning_enabled,
        "planning_mandatory": profile.planning_mandatory,
        "planning_phase1_enabled": profile.planning_phase1_enabled,
        "planning_phase2_enabled": profile.planning_phase2_enabled,
        "planning_phase3_enabled": profile.planning_phase3_enabled,
        "think_enabled": profile.think_enabled,
        "iteration_budget_enabled": profile.iteration_budget_enabled,
        "goal_anchoring_enabled": profile.goal_anchoring_enabled,
        "goal_anchoring_interval": profile.goal_anchoring_interval,
        "anti_stall_enabled": profile.anti_stall_enabled,
        "anti_stall_threshold": profile.anti_stall_threshold,
        "step_validation_enabled": profile.step_validation_enabled,
        "adaptive_replan_enabled": profile.adaptive_replan_enabled,
        "subagent_tools": profile.subagent_tools,
        "subagent_planning_enabled": profile.subagent_planning_enabled,
        "subagent_think_enabled": profile.subagent_think_enabled,
        "enabled_tools": profile.enabled_tools,
        "context_providers": profile.context_providers,
        "is_admin_only": getattr(profile, "is_admin_only", False),
    }


@router.put("/profiles/{profile_id}")
async def admin_update_profile(
    profile_id: str,
    body: dict,
    user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
):
    """Update profile configuration on disk and in-memory.

    Accepts a partial update — only provided fields are modified.
    Writes config.json and system_prompt.txt back to disk.
    """
    from pathlib import Path
    from navi.api.deps import get_profile_registry
    from navi.profiles.base import AgentProfile
    from navi.profiles.loader import save_profile_to_dir

    registry = get_profile_registry()
    try:
        old_profile = registry.get(profile_id)
    except Exception:
        raise HTTPException(status_code=404, detail="Profile not found")

    # Build updated profile from existing data + body overrides
    updated_data = old_profile.model_dump()
    updated_data.update(body)

    # Preserve fields that must not change via this endpoint
    updated_data["id"] = profile_id
    updated_data.setdefault("name", old_profile.name)
    updated_data.setdefault("description", old_profile.description)
    updated_data.setdefault("enabled_tools", old_profile.enabled_tools)
    updated_data.setdefault("system_prompt", old_profile.system_prompt)

    try:
        updated_profile = AgentProfile.model_validate(updated_data)
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Invalid profile data: {e}") from e

    # Preserve is_admin_only (not part of base model validation)
    if hasattr(old_profile, "is_admin_only"):
        updated_profile.is_admin_only = old_profile.is_admin_only

    # Write to disk
    profiles_dir = Path(__file__).parent.parent.parent / "profiles"
    save_profile_to_dir(updated_profile, profiles_dir)

    # Update in-memory registry
    registry.update(updated_profile)

    log.info("admin.profile_updated", profile_id=profile_id, admin_id=user.id)
    return {"ok": True}


# ── MCP administration ──────────────────────────────────────────────────────


@router.get("/mcp/config")
async def admin_get_mcp_config(
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    """Return the current MCP server configurations from mcp_servers.d/."""
    from navi.mcp.config import load_mcp_servers

    configs = load_mcp_servers()
    return {name: cfg.model_dump() for name, cfg in configs.items()}


@router.put("/mcp/config")
async def admin_update_mcp_config(
    body: dict,
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    """Replace all MCP server configurations (bulk update)."""
    from navi.mcp.config import McpServerConfig, save_mcp_servers

    validated: dict[str, McpServerConfig] = {}
    for name, cfg_data in body.items():
        _validate_mcp_name(name)
        try:
            validated[name] = McpServerConfig.model_validate(cfg_data)
        except Exception as exc:
            raise HTTPException(
                status_code=400,
                detail=f"Invalid config for server '{name}': {exc}",
            ) from exc

    save_mcp_servers(validated)
    log.info("admin.mcp_config_updated", admin_id=user.id)
    return {"ok": True}


@router.get("/mcp/config/{server_name}")
async def admin_get_single_mcp_config(
    server_name: str,
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    _validate_mcp_name(server_name)
    """Return the configuration for a single MCP server."""
    from navi.mcp.config import load_mcp_servers

    configs = load_mcp_servers()
    cfg = configs.get(server_name)
    if cfg is None:
        raise HTTPException(status_code=404, detail=f"MCP server '{server_name}' not found")
    return cfg.model_dump()


@router.put("/mcp/config/{server_name}")
async def admin_update_single_mcp_config(
    server_name: str,
    body: dict,
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    _validate_mcp_name(server_name)
    """Create or update a single MCP server configuration."""
    from navi.mcp.config import McpServerConfig, load_mcp_servers, save_mcp_servers

    try:
        validated = McpServerConfig.model_validate(body)
    except Exception as exc:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid config for server '{server_name}': {exc}",
        ) from exc

    configs = load_mcp_servers()
    configs[server_name] = validated
    save_mcp_servers(configs)
    log.info("admin.mcp_config_updated_single", server=server_name, admin_id=user.id)
    return {"ok": True}


@router.delete("/mcp/config/{server_name}")
async def admin_delete_single_mcp_config(
    server_name: str,
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    _validate_mcp_name(server_name)
    """Remove a single MCP server configuration."""
    from navi.mcp.config import load_mcp_servers, save_mcp_servers

    configs = load_mcp_servers()
    if server_name not in configs:
        raise HTTPException(status_code=404, detail=f"MCP server '{server_name}' not found")
    del configs[server_name]
    save_mcp_servers(configs)
    log.info("admin.mcp_config_deleted_single", server=server_name, admin_id=user.id)
    return {"ok": True}


@router.post("/mcp/{server_name}/reconnect")
async def admin_reconnect_mcp_server(
    server_name: str,
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    """Disconnect and reconnect a single MCP server, then re-register its tools."""
    from navi.api.deps import get_mcp_manager, get_tool_registry
    from navi.mcp.client import McpClient
    from navi.mcp.config import load_mcp_servers
    from navi.mcp.tools import McpTool

    mcp_manager = get_mcp_manager()
    if mcp_manager is None:
        raise HTTPException(status_code=503, detail="MCP manager not initialized")

    configs = load_mcp_servers(mcp_manager.config_path)
    cfg = configs.get(server_name)
    if cfg is None:
        raise HTTPException(
            status_code=404, detail=f"MCP server '{server_name}' not found in config"
        )

    # Drop old client
    old_client = mcp_manager.clients.pop(server_name, None)
    if old_client:
        try:
            await old_client.disconnect()
        except Exception:
            pass

    # Unregister old tools for this server
    from navi.mcp.tools import build_mcp_name

    tool_registry = get_tool_registry()
    prefix = build_mcp_name(server_name, "")
    for name in list(tool_registry._external_names):
        if name.startswith(prefix):
            tool_registry.unregister_external(name)

    # Connect fresh
    client = McpClient(server_name, cfg)
    try:
        await client.connect()
        mcp_manager.clients[server_name] = client
    except Exception as exc:
        log.warning("admin.mcp_reconnect_failed", server=server_name, error=str(exc))
        raise HTTPException(
            status_code=502, detail=f"Reconnect failed: {exc}"
        ) from exc

    # Re-register tools
    try:
        tools = await client.list_tools()
        for tool in tools:
            mcp_tool = McpTool(
                server_name=server_name,
                tool_name=tool.name,
                description=tool.description or "",
                parameters=tool.inputSchema,
                manager=mcp_manager,
            )
            tool_registry.register_external(mcp_tool)
    except Exception as exc:
        log.warning(
            "admin.mcp_list_tools_failed_after_reconnect",
            server=server_name,
            error=str(exc),
        )

    log.info("admin.mcp_reconnected", server=server_name, admin_id=user.id)
    return {"ok": True}


@router.get("/mcp/status")
async def admin_get_mcp_status(
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    """Return connection status and tool counts for every configured MCP server."""
    from navi.api.deps import get_mcp_manager
    from navi.mcp.config import load_mcp_servers

    configs = load_mcp_servers()
    manager = get_mcp_manager()

    servers = []
    for name, cfg in configs.items():
        client = manager.clients.get(name) if manager else None
        status: dict = {
            "name": name,
            "transport": cfg.transport,
            "connected": client.connected if client else False,
            "tool_count": 0,
            "instructions": None,
            "error": None,
        }
        if client and client.connected:
            status["instructions"] = client.instructions
            try:
                tools = await client.list_tools()
                status["tool_count"] = len(tools)
            except Exception as exc:
                status["error"] = str(exc)
        servers.append(status)

    return {"servers": servers}


@router.post("/mcp/test")
async def admin_test_mcp_tool(
    body: dict,
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    """Execute a single MCP tool call for testing/diagnostics."""
    from navi.api.deps import get_mcp_manager

    server_name = body.get("server_name")
    tool_name = body.get("tool_name")
    arguments = body.get("arguments", {})

    if not server_name or not tool_name:
        raise HTTPException(
            status_code=400, detail="server_name and tool_name are required"
        )

    mcp_manager = get_mcp_manager()
    if mcp_manager is None:
        raise HTTPException(status_code=503, detail="MCP manager not initialized")

    try:
        output, is_error = await mcp_manager.call_tool(
            server_name, tool_name, arguments
        )
    except Exception as exc:
        raise HTTPException(
            status_code=502, detail=f"Tool call failed: {exc}"
        ) from exc

    return {
        "server_name": server_name,
        "tool_name": tool_name,
        "arguments": arguments,
        "output": output,
        "is_error": is_error,
    }


@router.get("/profiles/{profile_id}/mcp")
async def admin_get_profile_mcp(
    profile_id: str,
    user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
) -> dict:
    """Return the MCP group mapping for a profile."""
    from navi.api.deps import get_profile_registry

    try:
        profile = get_profile_registry().get(profile_id)
    except Exception:
        raise HTTPException(status_code=404, detail="Profile not found")

    return {"profile_id": profile_id, "mcp_servers": profile.mcp_servers}


@router.put("/profiles/{profile_id}/mcp")
async def admin_update_profile_mcp(
    profile_id: str,
    body: dict,
    user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
) -> dict:
    """Update the MCP group mapping for a profile and persist to disk."""
    from pathlib import Path

    from navi.api.deps import get_profile_registry
    from navi.profiles.loader import save_profile_to_dir

    try:
        profile = get_profile_registry().get(profile_id)
    except Exception:
        raise HTTPException(status_code=404, detail="Profile not found")

    mcp_servers = body.get("mcp_servers")
    if mcp_servers is None:
        raise HTTPException(status_code=400, detail="mcp_servers is required")
    if not isinstance(mcp_servers, dict):
        raise HTTPException(status_code=400, detail="mcp_servers must be an object")

    for srv_name, groups in mcp_servers.items():
        if not isinstance(groups, list):
            raise HTTPException(
                status_code=400,
                detail=f"Groups for server '{srv_name}' must be a list",
            )
        if not all(isinstance(g, str) for g in groups):
            raise HTTPException(
                status_code=400,
                detail=f"Groups for server '{srv_name}' must be strings",
            )

    profile.mcp_servers = mcp_servers

    profiles_dir = Path(__file__).parent.parent.parent / "profiles"
    save_profile_to_dir(profile, profiles_dir)
    get_profile_registry().update(profile)

    log.info("admin.profile_mcp_updated", profile_id=profile_id, admin_id=user.id)
    return {"ok": True}


@router.get("/recalls")
async def admin_list_recalls(
    user: Annotated[User, Depends(require_admin)],
    scheduler: Annotated[RecallScheduler, Depends(get_scheduler)],
    limit: int = 50,
    offset: int = 0,
    session_id: str | None = None,
    user_id: str | None = None,
):
    """Return all scheduled recalls with pagination and filtering."""
    recalls = await scheduler.list_recalls(
        session_id=session_id,
        user_id=user_id,
        is_admin=True,
        limit=limit,
        offset=offset,
    )
    return {
        "limit": limit,
        "offset": offset,
        "items": [
            {
                "id": r.id,
                "session_id": r.session_id,
                "call_type": r.call_type,
                "trigger_at": r.trigger_at.isoformat(),
                "interval_seconds": r.interval_seconds,
                "internal_comment": r.internal_comment,
                "additional_context_message": r.additional_context_message,
                "status": r.status,
                "created_at": r.created_at.isoformat(),
                "updated_at": r.updated_at.isoformat(),
            }
            for r in recalls
        ],
    }