Newer
Older
navi-1 / navi / main.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy on 15 May 6 KB Add self-recall (scheduled callback) system
"""FastAPI application entry point."""

import asyncio
from pathlib import Path

import logging

import structlog
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles

from navi.api.routes import agents, auth, health, messages, sessions, webhooks
from navi.api.routes.admin import router as admin_router
from navi.api.websocket import router as ws_router
from navi.config import settings
from debug.eval.api import router as eval_router

structlog.configure(
    wrapper_class=structlog.make_filtering_bound_logger(
        getattr(logging, settings.log_level)
    ),
)

app = FastAPI(
    title="Navi",
    description="Modular agent system — REST API and WebSocket",
    version="0.1.0",
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Keep reference to background tasks so unhandled exceptions are not lost
_cleanup_task = None
_scheduler_task = None

app.include_router(health.router)
app.include_router(auth.router)
app.include_router(agents.router)
app.include_router(sessions.router)
app.include_router(messages.router)
app.include_router(ws_router)
app.include_router(webhooks.router)
app.include_router(admin_router)
app.include_router(eval_router)

_base = Path(__file__).parent.parent
app.mount("/assets", StaticFiles(directory=str(_base / "webclient" / "dist" / "assets")), name="assets")
app.mount("/images", StaticFiles(directory=str(_base / "webclient" / "dist" / "images")), name="images")
app.mount("/content-viewers", StaticFiles(directory=str(_base / "webclient" / "dist" / "content-viewers")), name="content_viewers")
app.mount("/content", StaticFiles(directory=str(_base / "navi" / "content")), name="content")


@app.on_event("startup")
async def _on_startup() -> None:
    log = structlog.get_logger()
    from navi.api.deps import get_registries, get_session_store
    from navi.content_store import ensure_tables
    from navi.session_files import cleanup_loop
    from navi.auth import _ensure_auth_tables
    # Ensure auth tables first (navi_users is referenced by other DDL).
    # Retry for race with Docker compose.
    for attempt in range(1, 6):
        try:
            await _ensure_auth_tables()
            await ensure_tables()
            break
        except Exception as e:
            if attempt < 5:
                log.warning("startup.ensure_tables_retry", attempt=attempt, error=str(e))
                await asyncio.sleep(2)
            else:
                log.error("startup.ensure_tables_failed", error=str(e))
    # Initialize registries before embed health check. The memory store gets its
    # embedding backend wired during registry construction.
    get_registries()
    # Connect MCP servers and register their tools as external.
    from navi.api.deps import get_mcp_manager, get_tool_registry, register_mcp_tools
    try:
        mcp_manager = await get_mcp_manager()
        tool_registry = get_tool_registry()
        await register_mcp_tools(tool_registry, mcp_manager)
        for tool_name in ("reload_tools", "mcp_status", "test_mcp_tool", "spawn_agent", "list_tools"):
            tool = tool_registry.get(tool_name)
            if hasattr(tool, "_mcp_manager"):
                tool._mcp_manager = mcp_manager
    except Exception:
        log.warning("startup.mcp_connect_failed", exc_info=True)
    # Apply persisted profile overrides (e.g. is_admin_only) to in-memory profiles.
    from navi.api.deps import get_profile_registry, get_session_store
    from navi.profiles._overrides import ensure_table, load_overrides

    try:
        pool = await get_session_store()._get_pool()
        await ensure_table(pool)
        overrides = await load_overrides(pool)
        if overrides:
            profiles = get_profile_registry()
            for pid, is_admin_only in overrides.items():
                try:
                    profile = profiles.get(pid)
                    profile.is_admin_only = is_admin_only
                except Exception:
                    pass  # stale override for removed profile
            log.info("startup.profile_overrides_applied", count=len(overrides))
    except Exception:
        log.warning("startup.profile_overrides_failed", exc_info=True)
    # Check embedding backend health and log status
    from navi.api.routes.health import _check_embed

    embed_status = await _check_embed()
    if embed_status["ok"]:
        log.info("startup.embed_ready", backend=embed_status["backend"])
    else:
        log.warning("startup.embed_unavailable", backend=embed_status["backend"], error=embed_status["error"])
    # Start session file cleanup background task
    global _cleanup_task
    _cleanup_task = asyncio.create_task(cleanup_loop(get_session_store()))
    # Start scheduled recall background task
    global _scheduler_task
    from navi.core.scheduler import recall_scheduler_loop
    from navi.api.deps import get_scheduler, get_session_store as _get_store
    _scheduler_task = asyncio.create_task(recall_scheduler_loop(get_scheduler(), _get_store()))


@app.on_event("shutdown")
async def _on_shutdown() -> None:
    from navi.tools.ssh_exec import close_all_connections
    from navi.api.deps import _mcp_manager

    close_all_connections()
    if _mcp_manager is not None:
        try:
            await _mcp_manager.disconnect_all()
        except (asyncio.CancelledError, RuntimeError):
            pass
        except Exception:
            pass
    global _scheduler_task
    if _scheduler_task is not None:
        _scheduler_task.cancel()
        try:
            await _scheduler_task
        except asyncio.CancelledError:
            pass


@app.get("/", include_in_schema=False)
async def index() -> FileResponse:
    return FileResponse(str(_base / "webclient" / "dist" / "index.html"), headers={"Cache-Control": "no-store"})


@app.get("/debug", include_in_schema=False)
async def debug() -> FileResponse:
    return FileResponse("debug/index.html", headers={"Cache-Control": "no-store"})


@app.get("/debug/eval", include_in_schema=False)
async def debug_eval() -> FileResponse:
    return FileResponse("debug/eval/index.html", headers={"Cache-Control": "no-store"})


@app.get("/admin", include_in_schema=False)
async def admin_panel() -> FileResponse:
    return FileResponse("admin/index.html", headers={"Cache-Control": "no-store"})