Newer
Older
navi-1 / navi / api / routes / admin.py
"""Admin endpoints for multi-user Navi management."""

from typing import Annotated

import structlog
from fastapi import APIRouter, Depends, HTTPException

from navi.api.deps import (
    get_session_store,
    require_admin,
    require_permission,
)
from navi.auth import User
from navi.config import settings
from navi.core import SessionStore

log = structlog.get_logger()
router = APIRouter(prefix="/admin", tags=["admin"])


@router.get("/sessions")
async def admin_list_sessions(
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
    limit: int = 50,
    offset: int = 0,
    search: str | None = None,
    sort_by: str = "last_active",
    sort_order: str = "desc",
):
    """Return all sessions across all users with pagination, search and sorting."""
    sessions = await store.search_list(
        limit=limit,
        offset=offset,
        user_id=user.id,
        is_admin=True,
        search=search or None,
        sort_by=sort_by,
        sort_order=sort_order,
    )
    total = await store.count_all(
        user_id=user.id, is_admin=True, search=search or None
    )
    return {
        "total": total,
        "limit": limit,
        "offset": offset,
        "items": [
            {
                "session_id": s.id,
                "profile_id": s.profile_id,
                "user_id": s.user_id,
                "name": s.name,
                "message_count": len(s.messages),
                "pinned": s.pinned,
                "created_at": s.created_at.isoformat(),
                "last_active": s.last_active.isoformat(),
            }
            for s in sessions
        ],
    }


@router.get("/sessions/{session_id}")
async def admin_get_session(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    """Return full session details including messages."""
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    return {
        "session_id": session.id,
        "profile_id": session.profile_id,
        "user_id": session.user_id,
        "name": session.name,
        "messages": [m.model_dump(mode="json", exclude_none=True) for m in session.messages],
        "context_token_count": session.context_token_count,
        "max_context_tokens": settings.ollama_num_ctx,
        "pinned": session.pinned,
        "created_at": session.created_at.isoformat(),
        "last_active": session.last_active.isoformat(),
    }


@router.delete("/sessions/{session_id}", status_code=204)
async def admin_delete_session(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
) -> None:
    """Delete any session (bypass ownership)."""
    from navi.session_files import delete_session_dir

    deleted = await store.delete(session_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="Session not found")
    await delete_session_dir(session_id)


@router.get("/users")
async def admin_list_users(
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
):
    """Return all registered navi_users."""
    pool = await store._get_pool()
    async with pool.acquire() as conn:
        rows = await conn.fetch(
            "SELECT id, email, display_name, role, permissions, created_at, updated_at FROM navi_users ORDER BY created_at DESC"
        )
    return [
        {
            "id": r["id"],
            "email": r["email"],
            "display_name": r["display_name"],
            "role": r["role"],
            "permissions": r["permissions"],
            "created_at": r["created_at"].isoformat(),
            "updated_at": r["updated_at"].isoformat(),
        }
        for r in rows
    ]


@router.get("/users/{user_id}")
async def admin_get_user(
    user_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    """Return single user details."""
    pool = await store._get_pool()
    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT id, email, display_name, role, permissions, created_at, updated_at FROM navi_users WHERE id = $1",
            user_id,
        )
    if row is None:
        raise HTTPException(status_code=404, detail="User not found")
    return {
        "id": row["id"],
        "email": row["email"],
        "display_name": row["display_name"],
        "role": row["role"],
        "permissions": row["permissions"],
        "created_at": row["created_at"].isoformat(),
        "updated_at": row["updated_at"].isoformat(),
    }


@router.get("/users/{user_id}/sessions")
async def admin_get_user_sessions(
    user_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
):
    """Return sessions owned by a specific user."""
    sessions = await store.list_all(user_id=user.id, is_admin=True)
    user_sessions = [s for s in sessions if s.user_id == user_id]
    return [
        {
            "session_id": s.id,
            "profile_id": s.profile_id,
            "name": s.name,
            "message_count": len(s.messages),
            "pinned": s.pinned,
            "created_at": s.created_at.isoformat(),
            "last_active": s.last_active.isoformat(),
        }
        for s in user_sessions
    ]


@router.get("/memory")
async def admin_list_memory(
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_permission("navi.memory.read_all"))],
):
    """Return all memory facts (global view)."""
    from navi.api.deps import get_memory_store

    memory = get_memory_store()
    facts = await memory.get_all_facts(limit=500)
    return {"facts": facts, "count": len(facts)}


@router.patch("/users/{user_id}/role")
async def admin_update_user_role(
    user_id: str,
    body: dict,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
):
    """Update a user's cached role (requires admin)."""
    role = body.get("role")
    if role not in ("user", "admin"):
        raise HTTPException(status_code=400, detail="Invalid role")
    pool = await store._get_pool()
    async with pool.acquire() as conn:
        await conn.execute(
            "UPDATE navi_users SET role = $1, updated_at = $2 WHERE id = $3",
            role,
            __import__("datetime").datetime.now(__import__("datetime").timezone.utc),
            user_id,
        )
    log.info("admin.role_updated", target_user_id=user_id, role=role, admin_id=user.id)
    return {"ok": True}


@router.get("/profiles")
async def admin_list_profiles(
    user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
):
    """Return all profiles including admin-only ones."""
    from navi.api.deps import get_profile_registry

    profiles = get_profile_registry()
    return [
        {
            "id": p.id,
            "name": p.name,
            "description": p.description,
            "is_admin_only": getattr(p, "is_admin_only", False),
        }
        for p in profiles.all()
    ]


@router.patch("/profiles/{profile_id}/availability")
async def admin_update_profile_availability(
    profile_id: str,
    body: dict,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
):
    """Toggle admin-only visibility for a profile and persist to DB."""
    is_admin_only = body.get("is_admin_only")
    if not isinstance(is_admin_only, bool):
        raise HTTPException(status_code=400, detail="is_admin_only must be a boolean")

    from navi.api.deps import get_profile_registry
    from navi.profiles._overrides import save_override

    pool = await store._get_pool()
    await save_override(pool, profile_id, is_admin_only)

    # Mutate the in-memory profile so the change is effective immediately
    # without a server restart.
    try:
        profile = get_profile_registry().get(profile_id)
        profile.is_admin_only = is_admin_only
    except Exception:
        pass  # profile may not be loaded; DB value is the source of truth anyway

    log.info(
        "admin.profile_availability",
        profile_id=profile_id,
        is_admin_only=is_admin_only,
        admin_id=user.id,
    )
    return {"ok": True}