Newer
Older
navi-1 / navi / api / routes / sessions.py
"""Session management endpoints."""

import asyncio
from datetime import datetime, timedelta, timezone
from typing import Annotated

import mimetypes

import structlog
from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile
from fastapi.responses import FileResponse
from pydantic import BaseModel

from navi.api.deps import (
    get_backend_registry,
    get_memory_store,
    get_profile_registry,
    get_scheduler,
    get_session_store,
    require_admin,
    require_permission,
    require_user,
)
from navi.auth import User
from navi.auth.deps import check_session_access
from navi.content_store import list_for_session
from navi.config import settings
from navi.core import BackendRegistry, ProfileRegistry, SessionStore
from navi.core.scheduler import RecallScheduler
from navi.core.name_generator import generate_session_name
from navi.exceptions import ProfileNotFound
from navi.memory import MemoryStore
from navi.session_files import delete_session_dir, ensure_session_dir, is_forbidden, list_session_files, safe_filename, session_dir

log = structlog.get_logger()

router = APIRouter(prefix="/sessions", tags=["sessions"])

class CreateSessionRequest(BaseModel):
    profile_id: str


class PinSessionRequest(BaseModel):
    pinned: bool


@router.post("", status_code=201)
async def create_session(
    payload: CreateSessionRequest,
    store: Annotated[SessionStore, Depends(get_session_store)],
    profiles: Annotated[ProfileRegistry, Depends(get_profile_registry)],
    memory: Annotated[MemoryStore, Depends(get_memory_store)],
    user: Annotated[User, Depends(require_user)],
) -> dict:
    try:
        profiles.get(payload.profile_id)
    except ProfileNotFound:
        raise HTTPException(status_code=404, detail=f"Profile '{payload.profile_id}' not found")

    session = await store.create(payload.profile_id, user_id=user.id)

    # Fire-and-forget: extract memory from any stale sessions (last active > 30 min ago)
    asyncio.create_task(_process_stale_sessions(store, memory, user_id=user.id))

    return {
        "session_id": session.id,
        "profile_id": session.profile_id,
        "created_at": session.created_at.isoformat(),
    }


async def _process_stale_sessions(store: SessionStore, memory: MemoryStore, user_id: str | None = None) -> None:
    """Extract facts from sessions idle for >30 min that haven't been processed yet."""
    from navi.api.deps import get_backend_registry
    from navi.config import settings
    from navi.memory.extractor import extract_and_update

    cutoff = datetime.now(timezone.utc) - timedelta(minutes=30)
    try:
        sessions = await store.list_all(user_id=user_id, is_admin=False)
        backend = get_backend_registry().get("ollama")
    except Exception:
        return

    for session in sessions:
        if not session.messages:
            continue
        if session.user_id is None:
            continue  # skip legacy sessions — no multi-user memory for unowned sessions
        if session.last_active >= cutoff:
            continue  # still active
        extracted_at = await memory.get_extracted_at(session.id)
        if extracted_at and extracted_at >= session.last_active.isoformat():
            continue  # already up to date
        try:
            await extract_and_update(session, backend, settings.ollama_default_model, memory)
        except Exception:
            log.warning("memory.extraction_failed", session_id=session.id, exc_info=True)


@router.get("")
async def list_sessions(
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_user)],
    scheduler: Annotated[RecallScheduler, Depends(get_scheduler)],
    limit: Annotated[int | None, Query(ge=1, le=100)] = None,
    offset: Annotated[int | None, Query(ge=0)] = None,
    profile_id: str | None = None,
    search: str | None = Query(None),
) -> dict | list[dict]:
    is_admin = user.role == "admin" or user.has_permission("navi.sessions.read_all")

    if limit is None and offset is None:
        sessions = await store.list_all(user_id=user.id, is_admin=is_admin)
        if profile_id and not search:
            sessions = [s for s in sessions if s.profile_id == profile_id]
        pending_ids = await scheduler.get_pending_session_ids([s.id for s in sessions])
        return [_session_summary(s, search=search, has_pending_recall=s.id in pending_ids) for s in sessions]

    page_limit = limit or 30
    page_offset = offset or 0
    if search:
        sessions = await store.search_list(
            limit=page_limit + 1,
            offset=page_offset,
            user_id=user.id,
            is_admin=is_admin,
            search=search,
        )
    else:
        sessions = await store.list_page(
            limit=page_limit + 1,
            offset=page_offset,
            profile_id=profile_id,
            user_id=user.id,
            is_admin=is_admin,
        )
    items = sessions[:page_limit]
    pending_ids = await scheduler.get_pending_session_ids([s.id for s in items])
    return {
        "items": [_session_summary(s, search=search, has_pending_recall=s.id in pending_ids) for s in items],
        "limit": page_limit,
        "offset": page_offset,
        "has_more": len(sessions) > page_limit,
        "next_offset": page_offset + len(items),
    }


def _session_summary(session, search: str | None = None, has_pending_recall: bool = False) -> dict:
    summary = {
        "session_id": session.id,
        "profile_id": session.profile_id,
        "name": session.name,
        "message_count": len(session.messages),
        "preview": _preview(session),
        "pinned": session.pinned,
        "has_pending_recall": has_pending_recall,
        "created_at": session.created_at.isoformat(),
        "last_active": session.last_active.isoformat(),
    }
    if search:
        summary["match_indices"], summary["match_preview"] = _match_info(session, search)
    return summary


def _preview(session) -> str:
    """Return a short snippet from the last user or assistant message."""
    for msg in reversed(session.messages):
        if msg.role in ("user", "assistant") and msg.content:
            return msg.content[:60]
    return ""


def _match_info(session, search: str) -> tuple[list[int], str]:
    """Return message indices that contain the search term and a preview snippet."""
    q = search.lower()
    indices = []
    preview = ""
    for i, msg in enumerate(session.messages):
        content = msg.content or ""
        if q in content.lower():
            indices.append(i)
            if not preview:
                idx = content.lower().find(q)
                start = max(0, idx - 30)
                end = min(len(content), idx + len(search) + 30)
                snippet = content[start:end]
                if start > 0:
                    snippet = "…" + snippet
                if end < len(content):
                    snippet = snippet + "…"
                preview = snippet
    return indices, preview


@router.get("/{session_id}")
async def get_session(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_user)],
) -> dict:
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    check_session_access(session, user, permission="navi.sessions.read_all")
    return {
        "session_id": session.id,
        "profile_id": session.profile_id,
        "name": session.name,
        "messages": [m.model_dump(mode='json', exclude_none=True) for m in session.messages],
        "context_token_count": session.context_token_count,
        "max_context_tokens": settings.ollama_num_ctx,
        "archive_threshold": session.archive_threshold,
        "created_at": session.created_at.isoformat(),
        "last_active": session.last_active.isoformat(),
    }


@router.patch("/{session_id}/pin")
async def pin_session(
    session_id: str,
    body: PinSessionRequest,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_user)],
) -> dict:
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    check_session_access(session, user)
    ok = await store.set_pinned(session_id, body.pinned)
    if not ok:
        raise HTTPException(status_code=404, detail="Session not found")
    return {"session_id": session_id, "pinned": body.pinned}


@router.get("/{session_id}/context")
async def get_session_context(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    """Return the LLM context (what the model actually sees) for debugging."""
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    total_chars = sum(len(m.content or "") for m in session.context)
    return {
        "session_id": session.id,
        "profile_id": session.profile_id,
        "message_count": len(session.context),
        "total_chars": total_chars,
        "context": [m.model_dump(mode="json", exclude_none=True) for m in session.context],
    }


@router.get("/{session_id}/planning")
async def get_session_planning(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_admin)],
) -> dict:
    """Return all planning phase debug logs for a session."""
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    return {"session_id": session.id, "logs": session.planning_logs}


@router.get("/{session_id}/content")
async def get_session_content(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
) -> dict:
    """Return published inline content records for this session.

    Public — no auth required. The session ID acts as an unguessable
    capability token for accessing shared/published files.
    """
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    return {"session_id": session.id, "content": await list_for_session(session_id)}


@router.post("/{session_id}/files", status_code=201)
async def upload_file(
    session_id: str,
    file: UploadFile,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_user)],
) -> dict:
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    check_session_access(session, user)

    name = safe_filename(file.filename or "upload")
    if is_forbidden(name):
        raise HTTPException(status_code=400, detail="Executable files are not allowed")

    max_bytes = settings.session_files_max_size_mb * 1024 * 1024
    dest = ensure_session_dir(session_id) / name

    # Handle duplicate filenames
    if dest.exists():
        stem, suffix = dest.stem, dest.suffix
        i = 1
        while dest.exists():
            dest = dest.with_name(f"{stem}_{i}{suffix}")
            i += 1

    # Stream chunks directly to disk to avoid buffering the whole file in RAM
    size = 0
    try:
        with dest.open("wb") as f:
            while True:
                chunk = await file.read(1024 * 1024)  # 1 MB at a time
                if not chunk:
                    break
                size += len(chunk)
                if size > max_bytes:
                    raise HTTPException(
                        status_code=413,
                        detail=f"File exceeds {settings.session_files_max_size_mb} MB limit",
                    )
                await asyncio.to_thread(f.write, chunk)
    except HTTPException:
        raise
    except Exception as exc:
        await asyncio.to_thread(dest.unlink, True)
        raise HTTPException(status_code=500, detail=f"Upload failed: {exc}") from exc

    return {
        "name": dest.name,
        "size": size,
        "path": str(dest),
        "content_type": file.content_type or "application/octet-stream",
    }


@router.get("/{session_id}/files")
async def list_session_files_endpoint(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_user)],
) -> dict:
    """Return all files and directories in the session's file directory (recursive, depth 10)."""
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    check_session_access(session, user, permission="navi.files.read_all")

    files = list_session_files(session_id)
    return {"session_id": session_id, "files": files}


@router.get("/{session_id}/files/{filename}")
async def download_file(
    session_id: str,
    filename: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    download: bool = False,
) -> FileResponse:
    """Download a file from the session's file directory.

    Public — no auth required. The session ID acts as an unguessable
    capability token for accessing shared/published files.
    """
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")

    # Resolve and verify the file is within the session directory (no path traversal)
    base = session_dir(session_id).resolve()
    file_path = (base / filename).resolve()
    try:
        file_path.relative_to(base)
    except ValueError:
        raise HTTPException(status_code=403, detail="Access denied")

    if not file_path.exists() or not file_path.is_file():
        raise HTTPException(status_code=404, detail="File not found")

    # Detect content type; use inline disposition for browser-renderable types
    content_type, _ = mimetypes.guess_type(file_path.name)
    content_type = content_type or "application/octet-stream"
    inline_types = {"image/", "text/html", "text/plain", "application/pdf"}
    inline = not download and any(content_type.startswith(t) for t in inline_types)

    return FileResponse(
        path=file_path,
        filename=filename,
        media_type=content_type,
        content_disposition_type="inline" if inline else "attachment",
    )


@router.post("/{session_id}/generate-name")
async def generate_name(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    backends: Annotated[BackendRegistry, Depends(get_backend_registry)],
    user: Annotated[User, Depends(require_user)],
) -> dict:
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    check_session_access(session, user)

    # Only generate if no name yet
    if session.name:
        return {"name": session.name}

    user_messages = [
        m.content for m in session.messages
        if m.role == "user" and m.content
    ]
    if not user_messages:
        return {"name": None}

    backend = backends.get("ollama")
    name = await generate_session_name(user_messages, backend, settings.ollama_default_model)

    if name:
        await store.set_name(session_id, name)

    return {"name": name}


@router.delete("/{session_id}", status_code=204)
async def delete_session(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_user)],
) -> None:
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    check_session_access(session, user, permission="navi.sessions.delete_all")
    # Close any persistent terminals tied to this session before deleting
    from navi.api.deps import get_orchestrator
    orchestrator = get_orchestrator()
    if orchestrator and orchestrator._container and orchestrator._container.terminal_manager:
        await orchestrator._container.terminal_manager.close_all(session_id)

    deleted = await store.delete(session_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="Session not found")
    # Remove session file directory if it exists
    await delete_session_dir(session_id)


@router.get("/{session_id}/messages/archive")
async def get_session_archived_messages(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_user)],
    before_seq: int | None = None,
    limit: int = Query(50, ge=1, le=200),
) -> dict:
    """Return older archived messages for scroll-up loading."""
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    check_session_access(session, user)
    msgs = await store.get_archived_messages(session_id, before_seq=before_seq, limit=limit)
    next_before_seq = msgs[0].sequence_number if msgs else None
    return {
        "items": [m.model_dump(mode="json", exclude_none=True) for m in msgs],
        "has_more": len(msgs) == limit,
        "next_before_seq": next_before_seq,
    }


@router.get("/{session_id}/recall")
async def get_session_recall(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_user)],
    scheduler: Annotated[RecallScheduler, Depends(get_scheduler)],
) -> dict:
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    check_session_access(session, user)
    recalls = await scheduler.list_recalls(
        session_id=session_id,
        user_id=user.id,
        is_admin=user.role == "admin",
        limit=50,
    )
    for r in recalls:
        if r.status == "pending":
            return {
                "id": r.id,
                "call_type": r.call_type,
                "trigger_at": r.trigger_at.isoformat(),
                "interval_seconds": r.interval_seconds,
                "internal_comment": r.internal_comment,
                "additional_context_message": r.additional_context_message,
                "status": r.status,
            }
    return {"recall": None}


@router.delete("/{session_id}/recall")
async def cancel_session_recall(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_user)],
    scheduler: Annotated[RecallScheduler, Depends(get_scheduler)],
) -> dict:
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    check_session_access(session, user)
    ok = await scheduler.cancel_recall(session_id)
    if ok:
        from navi.core.event_bus import get_event_bus
        from navi.core.events import RecallUpdate
        await get_event_bus().publish(
            RecallUpdate(session_id=session_id, status="cancelled", action="cancelled")
        )
    return {"ok": ok}


@router.post("/{session_id}/recall/skip")
async def skip_session_recall(
    session_id: str,
    store: Annotated[SessionStore, Depends(get_session_store)],
    user: Annotated[User, Depends(require_user)],
    scheduler: Annotated[RecallScheduler, Depends(get_scheduler)],
) -> dict:
    session = await store.get(session_id)
    if session is None:
        raise HTTPException(status_code=404, detail="Session not found")
    check_session_access(session, user)
    ok = await scheduler.skip_next_recall(session_id)
    if ok:
        from navi.core.event_bus import get_event_bus
        from navi.core.events import RecallUpdate
        await get_event_bus().publish(
            RecallUpdate(session_id=session_id, status="pending", action="rescheduled")
        )
    return {"ok": ok}