"""FastAPI application entry point."""
import asyncio
from pathlib import Path
import logging
import structlog
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from navi.api.routes import agents, auth, health, messages, sessions, webhooks
from navi.api.routes.admin import router as admin_router
from navi.api.websocket import router as ws_router
from navi.config import settings
from debug.eval.api import router as eval_router
structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, settings.log_level)
),
)
app = FastAPI(
title="Navi",
description="Modular agent system — REST API and WebSocket",
version="0.1.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Keep reference to background tasks so unhandled exceptions are not lost
_cleanup_task = None
_scheduler_task = None
app.include_router(health.router)
app.include_router(auth.router)
app.include_router(agents.router)
app.include_router(sessions.router)
app.include_router(messages.router)
app.include_router(ws_router)
app.include_router(webhooks.router)
app.include_router(admin_router)
app.include_router(eval_router)
_base = Path(__file__).parent.parent
app.mount("/assets", StaticFiles(directory=str(_base / "webclient" / "dist" / "assets")), name="assets")
app.mount("/images", StaticFiles(directory=str(_base / "webclient" / "dist" / "images")), name="images")
app.mount("/content-viewers", StaticFiles(directory=str(_base / "webclient" / "dist" / "content-viewers")), name="content_viewers")
app.mount("/content", StaticFiles(directory=str(_base / "navi" / "content")), name="content")
@app.on_event("startup")
async def _on_startup() -> None:
log = structlog.get_logger()
from navi.api.deps import get_registries, get_session_store
from navi.content_store import ensure_tables
from navi.session_files import cleanup_loop
from navi.auth import _ensure_auth_tables
# Ensure auth tables first (navi_users is referenced by other DDL).
# Retry for race with Docker compose.
for attempt in range(1, 6):
try:
await _ensure_auth_tables()
await ensure_tables()
break
except Exception as e:
if attempt < 5:
log.warning("startup.ensure_tables_retry", attempt=attempt, error=str(e))
await asyncio.sleep(2)
else:
log.error("startup.ensure_tables_failed", error=str(e))
# Initialize registries before embed health check. The memory store gets its
# embedding backend wired during registry construction.
get_registries()
# Connect MCP servers and register their tools as external.
from navi.api.deps import get_mcp_manager, get_tool_registry, register_mcp_tools
try:
mcp_manager = await get_mcp_manager()
tool_registry = get_tool_registry()
await register_mcp_tools(tool_registry, mcp_manager)
for tool_name in ("reload_tools", "mcp_status", "test_mcp_tool", "spawn_agent", "list_tools"):
tool = tool_registry.get(tool_name)
if hasattr(tool, "_mcp_manager"):
tool._mcp_manager = mcp_manager
except Exception:
log.warning("startup.mcp_connect_failed", exc_info=True)
# Apply persisted profile overrides (e.g. is_admin_only) to in-memory profiles.
from navi.api.deps import get_profile_registry, get_session_store
from navi.profiles._overrides import ensure_table, load_overrides
try:
pool = await get_session_store()._get_pool()
await ensure_table(pool)
overrides = await load_overrides(pool)
if overrides:
profiles = get_profile_registry()
for pid, is_admin_only in overrides.items():
try:
profile = profiles.get(pid)
profile.is_admin_only = is_admin_only
except Exception:
pass # stale override for removed profile
log.info("startup.profile_overrides_applied", count=len(overrides))
except Exception:
log.warning("startup.profile_overrides_failed", exc_info=True)
# Check embedding backend health and log status
from navi.api.routes.health import _check_embed
embed_status = await _check_embed()
if embed_status["ok"]:
log.info("startup.embed_ready", backend=embed_status["backend"])
else:
log.warning("startup.embed_unavailable", backend=embed_status["backend"], error=embed_status["error"])
# Start session file cleanup background task
global _cleanup_task
_cleanup_task = asyncio.create_task(cleanup_loop(get_session_store()))
# Start scheduled recall background task
global _scheduler_task
from navi.core.scheduler import recall_scheduler_loop
from navi.api.deps import get_scheduler, get_session_store as _get_store
_scheduler_task = asyncio.create_task(recall_scheduler_loop(get_scheduler(), _get_store()))
@app.on_event("shutdown")
async def _on_shutdown() -> None:
from navi.tools.ssh_exec import close_all_connections
from navi.api.deps import _mcp_manager
close_all_connections()
if _mcp_manager is not None:
try:
await _mcp_manager.disconnect_all()
except (asyncio.CancelledError, RuntimeError):
pass
except Exception:
pass
global _scheduler_task
if _scheduler_task is not None:
_scheduler_task.cancel()
try:
await _scheduler_task
except asyncio.CancelledError:
pass
from fastapi.responses import RedirectResponse
@app.get("/sessions/{session_id}/files/{filename}", include_in_schema=False)
async def legacy_session_file_redirect(session_id: str, filename: str, download: bool = False):
target = f"/api/sessions/{session_id}/files/{filename}"
if download:
target += "?download=1"
return RedirectResponse(url=target, status_code=301)
@app.get("/", include_in_schema=False)
async def index() -> FileResponse:
return FileResponse(str(_base / "webclient" / "dist" / "index.html"), headers={"Cache-Control": "no-store"})
@app.get("/debug", include_in_schema=False)
async def debug() -> FileResponse:
return FileResponse("debug/index.html", headers={"Cache-Control": "no-store"})
@app.get("/debug/eval", include_in_schema=False)
async def debug_eval() -> FileResponse:
return FileResponse("debug/eval/index.html", headers={"Cache-Control": "no-store"})
@app.get("/admin", include_in_schema=False)
async def admin_panel() -> FileResponse:
return FileResponse("admin/index.html", headers={"Cache-Control": "no-store"})