"""Admin endpoints for multi-user Navi management."""
from typing import Annotated
import structlog
from fastapi import APIRouter, Depends, HTTPException
from navi.api.deps import (
get_scheduler,
get_session_store,
require_admin,
require_permission,
)
from navi.auth import User
from navi.config import settings
from navi.core import SessionStore
from navi.core.scheduler import RecallScheduler
log = structlog.get_logger()
router = APIRouter(prefix="/admin", tags=["admin"])
@router.get("/sessions")
async def admin_list_sessions(
store: Annotated[SessionStore, Depends(get_session_store)],
user: Annotated[User, Depends(require_admin)],
limit: int = 50,
offset: int = 0,
search: str | None = None,
sort_by: str = "last_active",
sort_order: str = "desc",
):
"""Return all sessions across all users with pagination, search and sorting."""
sessions = await store.search_list(
limit=limit,
offset=offset,
user_id=user.id,
is_admin=True,
search=search or None,
sort_by=sort_by,
sort_order=sort_order,
)
total = await store.count_all(
user_id=user.id, is_admin=True, search=search or None
)
return {
"total": total,
"limit": limit,
"offset": offset,
"items": [
{
"session_id": s.id,
"profile_id": s.profile_id,
"user_id": s.user_id,
"name": s.name,
"message_count": len(s.messages),
"pinned": s.pinned,
"created_at": s.created_at.isoformat(),
"last_active": s.last_active.isoformat(),
}
for s in sessions
],
}
@router.get("/sessions/{session_id}")
async def admin_get_session(
session_id: str,
store: Annotated[SessionStore, Depends(get_session_store)],
user: Annotated[User, Depends(require_admin)],
) -> dict:
"""Return full session details including messages."""
session = await store.get(session_id)
if session is None:
raise HTTPException(status_code=404, detail="Session not found")
return {
"session_id": session.id,
"profile_id": session.profile_id,
"user_id": session.user_id,
"name": session.name,
"messages": [m.model_dump(mode="json", exclude_none=True) for m in session.messages],
"context_token_count": session.context_token_count,
"max_context_tokens": settings.ollama_num_ctx,
"pinned": session.pinned,
"created_at": session.created_at.isoformat(),
"last_active": session.last_active.isoformat(),
}
@router.delete("/sessions/{session_id}", status_code=204)
async def admin_delete_session(
session_id: str,
store: Annotated[SessionStore, Depends(get_session_store)],
user: Annotated[User, Depends(require_admin)],
) -> None:
"""Delete any session (bypass ownership)."""
from navi.session_files import delete_session_dir
deleted = await store.delete(session_id)
if not deleted:
raise HTTPException(status_code=404, detail="Session not found")
await delete_session_dir(session_id)
@router.get("/users")
async def admin_list_users(
store: Annotated[SessionStore, Depends(get_session_store)],
user: Annotated[User, Depends(require_admin)],
):
"""Return all registered navi_users."""
pool = await store._get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, email, display_name, role, permissions, created_at, updated_at FROM navi_users ORDER BY created_at DESC"
)
return [
{
"id": r["id"],
"email": r["email"],
"display_name": r["display_name"],
"role": r["role"],
"permissions": r["permissions"],
"created_at": r["created_at"].isoformat(),
"updated_at": r["updated_at"].isoformat(),
}
for r in rows
]
@router.get("/users/{user_id}")
async def admin_get_user(
user_id: str,
store: Annotated[SessionStore, Depends(get_session_store)],
user: Annotated[User, Depends(require_admin)],
) -> dict:
"""Return single user details."""
pool = await store._get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT id, email, display_name, role, permissions, created_at, updated_at FROM navi_users WHERE id = $1",
user_id,
)
if row is None:
raise HTTPException(status_code=404, detail="User not found")
return {
"id": row["id"],
"email": row["email"],
"display_name": row["display_name"],
"role": row["role"],
"permissions": row["permissions"],
"created_at": row["created_at"].isoformat(),
"updated_at": row["updated_at"].isoformat(),
}
@router.get("/users/{user_id}/sessions")
async def admin_get_user_sessions(
user_id: str,
store: Annotated[SessionStore, Depends(get_session_store)],
user: Annotated[User, Depends(require_admin)],
):
"""Return sessions owned by a specific user."""
sessions = await store.list_all(user_id=user.id, is_admin=True)
user_sessions = [s for s in sessions if s.user_id == user_id]
return [
{
"session_id": s.id,
"profile_id": s.profile_id,
"name": s.name,
"message_count": len(s.messages),
"pinned": s.pinned,
"created_at": s.created_at.isoformat(),
"last_active": s.last_active.isoformat(),
}
for s in user_sessions
]
@router.get("/memory")
async def admin_list_memory(
store: Annotated[SessionStore, Depends(get_session_store)],
user: Annotated[User, Depends(require_permission("navi.memory.read_all"))],
limit: int = 50,
offset: int = 0,
search: str | None = None,
sort_by: str = "updated_at",
sort_order: str = "desc",
user_id: str | None = None,
):
"""Return memory facts with pagination, search and sorting."""
from navi.api.deps import get_memory_store
memory = get_memory_store()
facts = await memory.get_all_facts(
limit=limit,
offset=offset,
search=search or None,
sort_by=sort_by,
sort_order=sort_order,
user_id=user_id,
all_users=user_id is None,
)
total = await memory.fact_count(
user_id=user_id,
all_users=user_id is None,
search=search or None,
)
return {"total": total, "limit": limit, "offset": offset, "items": facts}
@router.patch("/users/{user_id}/role")
async def admin_update_user_role(
user_id: str,
body: dict,
store: Annotated[SessionStore, Depends(get_session_store)],
user: Annotated[User, Depends(require_admin)],
):
"""Update a user's cached role (requires admin)."""
role = body.get("role")
if role not in ("user", "admin"):
raise HTTPException(status_code=400, detail="Invalid role")
pool = await store._get_pool()
async with pool.acquire() as conn:
await conn.execute(
"UPDATE navi_users SET role = $1, updated_at = $2 WHERE id = $3",
role,
__import__("datetime").datetime.now(__import__("datetime").timezone.utc),
user_id,
)
log.info("admin.role_updated", target_user_id=user_id, role=role, admin_id=user.id)
return {"ok": True}
@router.post("/ollama/clear-blacklists", status_code=204)
async def admin_clear_ollama_blacklists(
user: Annotated[User, Depends(require_admin)],
) -> None:
"""Clear dead-server and dead-model blacklists for the Ollama fallback backend."""
from navi.llm.fallback import clear_blacklists
clear_blacklists()
log.info("admin.ollama_blacklists_cleared", admin_id=user.id)
@router.get("/profiles")
async def admin_list_profiles(
user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
):
"""Return all profiles including admin-only ones."""
from navi.api.deps import get_profile_registry
profiles = get_profile_registry()
return [
{
"id": p.id,
"name": p.name,
"description": p.description,
"is_admin_only": getattr(p, "is_admin_only", False),
}
for p in profiles.all()
]
@router.patch("/profiles/{profile_id}/availability")
async def admin_update_profile_availability(
profile_id: str,
body: dict,
store: Annotated[SessionStore, Depends(get_session_store)],
user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
):
"""Toggle admin-only visibility for a profile and persist to DB."""
is_admin_only = body.get("is_admin_only")
if not isinstance(is_admin_only, bool):
raise HTTPException(status_code=400, detail="is_admin_only must be a boolean")
from navi.api.deps import get_profile_registry
from navi.profiles._overrides import save_override
pool = await store._get_pool()
await save_override(pool, profile_id, is_admin_only)
# Mutate the in-memory profile so the change is effective immediately
# without a server restart.
try:
profile = get_profile_registry().get(profile_id)
profile.is_admin_only = is_admin_only
except Exception:
pass # profile may not be loaded; DB value is the source of truth anyway
log.info(
"admin.profile_availability",
profile_id=profile_id,
is_admin_only=is_admin_only,
admin_id=user.id,
)
return {"ok": True}
@router.get("/profiles/{profile_id}")
async def admin_get_profile(
profile_id: str,
user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
):
"""Return full profile configuration including system prompt."""
from navi.api.deps import get_profile_registry
try:
profile = get_profile_registry().get(profile_id)
except Exception:
raise HTTPException(status_code=404, detail="Profile not found")
return {
"id": profile.id,
"name": profile.name,
"description": profile.description,
"short_description": profile.short_description,
"full_description": profile.full_description,
"system_prompt": profile.system_prompt,
"subagent_system_prompt": profile.subagent_system_prompt,
"llm_backend": profile.llm_backend,
"model": profile.model,
"temperature": profile.temperature,
"top_k": profile.top_k,
"top_p": profile.top_p,
"num_thread": profile.num_thread,
"max_iterations": profile.max_iterations,
"planning_enabled": profile.planning_enabled,
"planning_mandatory": profile.planning_mandatory,
"planning_phase1_enabled": profile.planning_phase1_enabled,
"planning_phase2_enabled": profile.planning_phase2_enabled,
"planning_phase3_enabled": profile.planning_phase3_enabled,
"think_enabled": profile.think_enabled,
"iteration_budget_enabled": profile.iteration_budget_enabled,
"goal_anchoring_enabled": profile.goal_anchoring_enabled,
"goal_anchoring_interval": profile.goal_anchoring_interval,
"anti_stall_enabled": profile.anti_stall_enabled,
"anti_stall_threshold": profile.anti_stall_threshold,
"step_validation_enabled": profile.step_validation_enabled,
"adaptive_replan_enabled": profile.adaptive_replan_enabled,
"subagent_tools": profile.subagent_tools,
"subagent_planning_enabled": profile.subagent_planning_enabled,
"subagent_think_enabled": profile.subagent_think_enabled,
"enabled_tools": profile.enabled_tools,
"context_providers": profile.context_providers,
"is_admin_only": getattr(profile, "is_admin_only", False),
}
@router.put("/profiles/{profile_id}")
async def admin_update_profile(
profile_id: str,
body: dict,
user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
):
"""Update profile configuration on disk and in-memory.
Accepts a partial update — only provided fields are modified.
Writes config.json and system_prompt.txt back to disk.
"""
from pathlib import Path
from navi.api.deps import get_profile_registry
from navi.profiles.base import AgentProfile
from navi.profiles.loader import save_profile_to_dir
registry = get_profile_registry()
try:
old_profile = registry.get(profile_id)
except Exception:
raise HTTPException(status_code=404, detail="Profile not found")
# Build updated profile from existing data + body overrides
updated_data = old_profile.model_dump()
updated_data.update(body)
# Preserve fields that must not change via this endpoint
updated_data["id"] = profile_id
updated_data.setdefault("name", old_profile.name)
updated_data.setdefault("description", old_profile.description)
updated_data.setdefault("enabled_tools", old_profile.enabled_tools)
updated_data.setdefault("system_prompt", old_profile.system_prompt)
try:
updated_profile = AgentProfile.model_validate(updated_data)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid profile data: {e}") from e
# Preserve is_admin_only (not part of base model validation)
if hasattr(old_profile, "is_admin_only"):
updated_profile.is_admin_only = old_profile.is_admin_only
# Write to disk
profiles_dir = Path(__file__).parent.parent.parent / "profiles"
save_profile_to_dir(updated_profile, profiles_dir)
# Update in-memory registry
registry.update(updated_profile)
log.info("admin.profile_updated", profile_id=profile_id, admin_id=user.id)
return {"ok": True}
# ── MCP administration ──────────────────────────────────────────────────────
@router.get("/mcp/config")
async def admin_get_mcp_config(
user: Annotated[User, Depends(require_admin)],
) -> dict:
"""Return the current MCP server configurations from mcp_servers.d/."""
from navi.mcp.config import load_mcp_servers
configs = load_mcp_servers()
return {name: cfg.model_dump() for name, cfg in configs.items()}
@router.put("/mcp/config")
async def admin_update_mcp_config(
body: dict,
user: Annotated[User, Depends(require_admin)],
) -> dict:
"""Replace all MCP server configurations (bulk update)."""
from navi.mcp.config import McpServerConfig, save_mcp_servers
validated: dict[str, McpServerConfig] = {}
for name, cfg_data in body.items():
try:
validated[name] = McpServerConfig.model_validate(cfg_data)
except Exception as exc:
raise HTTPException(
status_code=400,
detail=f"Invalid config for server '{name}': {exc}",
) from exc
save_mcp_servers(validated)
log.info("admin.mcp_config_updated", admin_id=user.id)
return {"ok": True}
@router.get("/mcp/config/{server_name}")
async def admin_get_single_mcp_config(
server_name: str,
user: Annotated[User, Depends(require_admin)],
) -> dict:
"""Return the configuration for a single MCP server."""
from navi.mcp.config import load_mcp_servers
configs = load_mcp_servers()
cfg = configs.get(server_name)
if cfg is None:
raise HTTPException(status_code=404, detail=f"MCP server '{server_name}' not found")
return cfg.model_dump()
@router.put("/mcp/config/{server_name}")
async def admin_update_single_mcp_config(
server_name: str,
body: dict,
user: Annotated[User, Depends(require_admin)],
) -> dict:
"""Create or update a single MCP server configuration."""
from navi.mcp.config import McpServerConfig, load_mcp_servers, save_mcp_servers
try:
validated = McpServerConfig.model_validate(body)
except Exception as exc:
raise HTTPException(
status_code=400,
detail=f"Invalid config for server '{server_name}': {exc}",
) from exc
configs = load_mcp_servers()
configs[server_name] = validated
save_mcp_servers(configs)
log.info("admin.mcp_config_updated_single", server=server_name, admin_id=user.id)
return {"ok": True}
@router.delete("/mcp/config/{server_name}")
async def admin_delete_single_mcp_config(
server_name: str,
user: Annotated[User, Depends(require_admin)],
) -> dict:
"""Remove a single MCP server configuration."""
from navi.mcp.config import load_mcp_servers, save_mcp_servers
configs = load_mcp_servers()
if server_name not in configs:
raise HTTPException(status_code=404, detail=f"MCP server '{server_name}' not found")
del configs[server_name]
save_mcp_servers(configs)
log.info("admin.mcp_config_deleted_single", server=server_name, admin_id=user.id)
return {"ok": True}
@router.post("/mcp/{server_name}/reconnect")
async def admin_reconnect_mcp_server(
server_name: str,
user: Annotated[User, Depends(require_admin)],
) -> dict:
"""Disconnect and reconnect a single MCP server, then re-register its tools."""
from navi.api.deps import _mcp_manager, get_tool_registry
from navi.mcp.client import McpClient
from navi.mcp.config import load_mcp_servers
from navi.mcp.tools import McpTool
if _mcp_manager is None:
raise HTTPException(status_code=503, detail="MCP manager not initialized")
configs = load_mcp_servers(_mcp_manager.config_path)
cfg = configs.get(server_name)
if cfg is None:
raise HTTPException(
status_code=404, detail=f"MCP server '{server_name}' not found in config"
)
# Drop old client
old_client = _mcp_manager.clients.pop(server_name, None)
if old_client:
try:
await old_client.disconnect()
except Exception:
pass
# Unregister old tools for this server
from navi.mcp.tools import build_mcp_name
tool_registry = get_tool_registry()
prefix = build_mcp_name(server_name, "")
for name in list(tool_registry._external_names):
if name.startswith(prefix):
tool_registry.unregister_external(name)
# Connect fresh
client = McpClient(server_name, cfg)
try:
await client.connect()
_mcp_manager.clients[server_name] = client
except Exception as exc:
log.warning("admin.mcp_reconnect_failed", server=server_name, error=str(exc))
raise HTTPException(
status_code=502, detail=f"Reconnect failed: {exc}"
) from exc
# Re-register tools
try:
tools = await client.list_tools()
for tool in tools:
mcp_tool = McpTool(
server_name=server_name,
tool_name=tool.name,
description=tool.description or "",
parameters=tool.inputSchema,
manager=_mcp_manager,
)
tool_registry.register_external(mcp_tool)
except Exception as exc:
log.warning(
"admin.mcp_list_tools_failed_after_reconnect",
server=server_name,
error=str(exc),
)
log.info("admin.mcp_reconnected", server=server_name, admin_id=user.id)
return {"ok": True}
@router.get("/mcp/status")
async def admin_get_mcp_status(
user: Annotated[User, Depends(require_admin)],
) -> dict:
"""Return connection status and tool counts for every configured MCP server."""
from navi.api.deps import _mcp_manager
from navi.mcp.config import load_mcp_servers
configs = load_mcp_servers()
manager = _mcp_manager
servers = []
for name, cfg in configs.items():
client = manager.clients.get(name) if manager else None
status: dict = {
"name": name,
"transport": cfg.transport,
"connected": client.connected if client else False,
"tool_count": 0,
"instructions": None,
"error": None,
}
if client and client.connected:
status["instructions"] = client.instructions
try:
tools = await client.list_tools()
status["tool_count"] = len(tools)
except Exception as exc:
status["error"] = str(exc)
servers.append(status)
return {"servers": servers}
@router.post("/mcp/test")
async def admin_test_mcp_tool(
body: dict,
user: Annotated[User, Depends(require_admin)],
) -> dict:
"""Execute a single MCP tool call for testing/diagnostics."""
from navi.api.deps import _mcp_manager
server_name = body.get("server_name")
tool_name = body.get("tool_name")
arguments = body.get("arguments", {})
if not server_name or not tool_name:
raise HTTPException(
status_code=400, detail="server_name and tool_name are required"
)
if _mcp_manager is None:
raise HTTPException(status_code=503, detail="MCP manager not initialized")
try:
output, is_error = await _mcp_manager.call_tool(
server_name, tool_name, arguments
)
except Exception as exc:
raise HTTPException(
status_code=502, detail=f"Tool call failed: {exc}"
) from exc
return {
"server_name": server_name,
"tool_name": tool_name,
"arguments": arguments,
"output": output,
"is_error": is_error,
}
@router.get("/profiles/{profile_id}/mcp")
async def admin_get_profile_mcp(
profile_id: str,
user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
) -> dict:
"""Return the MCP group mapping for a profile."""
from navi.api.deps import get_profile_registry
try:
profile = get_profile_registry().get(profile_id)
except Exception:
raise HTTPException(status_code=404, detail="Profile not found")
return {"profile_id": profile_id, "mcp_servers": profile.mcp_servers}
@router.put("/profiles/{profile_id}/mcp")
async def admin_update_profile_mcp(
profile_id: str,
body: dict,
user: Annotated[User, Depends(require_permission("navi.profiles.manage"))],
) -> dict:
"""Update the MCP group mapping for a profile and persist to disk."""
from pathlib import Path
from navi.api.deps import get_profile_registry
from navi.profiles.loader import save_profile_to_dir
try:
profile = get_profile_registry().get(profile_id)
except Exception:
raise HTTPException(status_code=404, detail="Profile not found")
mcp_servers = body.get("mcp_servers")
if mcp_servers is None:
raise HTTPException(status_code=400, detail="mcp_servers is required")
if not isinstance(mcp_servers, dict):
raise HTTPException(status_code=400, detail="mcp_servers must be an object")
for srv_name, groups in mcp_servers.items():
if not isinstance(groups, list):
raise HTTPException(
status_code=400,
detail=f"Groups for server '{srv_name}' must be a list",
)
if not all(isinstance(g, str) for g in groups):
raise HTTPException(
status_code=400,
detail=f"Groups for server '{srv_name}' must be strings",
)
profile.mcp_servers = mcp_servers
profiles_dir = Path(__file__).parent.parent.parent / "profiles"
save_profile_to_dir(profile, profiles_dir)
get_profile_registry().update(profile)
log.info("admin.profile_mcp_updated", profile_id=profile_id, admin_id=user.id)
return {"ok": True}
@router.get("/recalls")
async def admin_list_recalls(
user: Annotated[User, Depends(require_admin)],
scheduler: Annotated[RecallScheduler, Depends(get_scheduler)],
limit: int = 50,
offset: int = 0,
session_id: str | None = None,
user_id: str | None = None,
):
"""Return all scheduled recalls with pagination and filtering."""
recalls = await scheduler.list_recalls(
session_id=session_id,
user_id=user_id,
is_admin=True,
limit=limit,
offset=offset,
)
return {
"limit": limit,
"offset": offset,
"items": [
{
"id": r.id,
"session_id": r.session_id,
"call_type": r.call_type,
"trigger_at": r.trigger_at.isoformat(),
"interval_seconds": r.interval_seconds,
"internal_comment": r.internal_comment,
"additional_context_message": r.additional_context_message,
"status": r.status,
"created_at": r.created_at.isoformat(),
"updated_at": r.updated_at.isoformat(),
}
for r in recalls
],
}