Newer
Older
navi-1 / navi / api / routes / sessions.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy on 9 Apr 5 KB Add long-term user memory system
"""Session management endpoints."""

import asyncio
from datetime import datetime, timedelta, timezone
from typing import Annotated

import structlog
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel

from navi.api.deps import get_memory_store, get_profile_registry, get_session_store
from navi.core import ProfileRegistry, SessionStore
from navi.exceptions import ProfileNotFound
from navi.memory import MemoryStore

log = structlog.get_logger()

router = APIRouter(prefix="/sessions", tags=["sessions"])


class CreateSessionRequest(BaseModel):
    profile_id: str


class PinSessionRequest(BaseModel):
    pinned: bool


@router.post("", status_code=201)
async def create_session(
    body: CreateSessionRequest,
    store: Annotated[SessionStore, Depends(get_session_store)],
    profiles: Annotated[ProfileRegistry, Depends(get_profile_registry)],
    memory: Annotated[MemoryStore, Depends(get_memory_store)],
) -> dict:
    try:
        profiles.get(body.profile_id)
    except ProfileNotFound:
        raise HTTPException(status_code=404, detail=f"Profile '{body.profile_id}' not found")

    session = await store.create(body.profile_id)

    # Fire-and-forget: extract memory from any stale sessions (last active > 30 min ago)
    asyncio.create_task(_process_stale_sessions(store, memory))

    return {
        "session_id": session.id,
        "profile_id": session.profile_id,
        "created_at": session.created_at.isoformat(),
    }


async def _process_stale_sessions(store: SessionStore, memory: MemoryStore) -> None:
    """Extract facts from sessions idle for >30 min that haven't been processed yet."""
    from navi.api.deps import get_backend_registry
    from navi.config import settings
    from navi.memory.extractor import extract_and_update

    cutoff = datetime.now(timezone.utc) - timedelta(minutes=30)
    try:
        sessions = await store.list_all()
        backend = get_backend_registry().get("ollama")
    except Exception:
        return

    for session in sessions:
        if not session.messages:
            continue
        if session.last_active >= cutoff:
            continue  # still active
        extracted_at = await memory.get_extracted_at(session.id)
        if extracted_at and extracted_at >= session.last_active.isoformat():
            continue  # already up to date
        try:
            await extract_and_update(session, backend, settings.ollama_default_model, memory)
        except Exception:
            log.warning("memory.extraction_failed", session_id=session.id, exc_info=True)


@router.get("")
async def list_sessions(
    store: Annotated[SessionStore, Depends(get_session_store)],
) -> list[dict]:
    sessions = await store.list_all()
    return [
        {
            "session_id": s.id,
            "profile_id": s.profile_id,
            "message_count": len(s.messages),
            "preview": _preview(s),
            "pinned": s.pinned,
            "created_at": s.created_at.isoformat(),
            "last_active": s.last_active.isoformat(),
        }
        for s in sessions
    ]


def _preview(session) -> str:
    """Return a short snippet from the last user or assistant message."""
    for msg in reversed(session.messages):
        if msg.role in ("user", "assistant") and msg.content:
            return msg.content[:60]
    return ""


@router.get("/{session_id}")
async def get_session(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
) -> dict:
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    return {
        "session_id": session.id,
        "profile_id": session.profile_id,
        "messages": [m.model_dump(mode='json', exclude_none=True) for m in session.messages],
        "created_at": session.created_at.isoformat(),
        "last_active": session.last_active.isoformat(),
    }


@router.patch("/{session_id}/pin")
async def pin_session(
    session_id: str,
    body: PinSessionRequest,
    store: Annotated[SessionStore, Depends(get_session_store)],
) -> dict:
    ok = await store.set_pinned(session_id, body.pinned)
    if not ok:
        raise HTTPException(status_code=404, detail="Session not found")
    return {"session_id": session_id, "pinned": body.pinned}


@router.get("/{session_id}/context")
async def get_session_context(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
) -> dict:
    """Return the LLM context (what the model actually sees) for debugging."""
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    total_chars = sum(len(m.content or "") for m in session.context)
    return {
        "session_id": session.id,
        "profile_id": session.profile_id,
        "message_count": len(session.context),
        "total_chars": total_chars,
        "context": [m.model_dump(mode="json", exclude_none=True) for m in session.context],
    }


@router.delete("/{session_id}", status_code=204)
async def delete_session(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
) -> None:
    deleted = await store.delete(session_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="Session not found")