"""Session management endpoints."""
import asyncio
from datetime import datetime, timedelta, timezone
from typing import Annotated
import mimetypes
import structlog
from fastapi import APIRouter, Depends, HTTPException, UploadFile
from fastapi.responses import FileResponse
from pydantic import BaseModel
from navi.api.deps import get_backend_registry, get_memory_store, get_profile_registry, get_session_store
from navi.config import settings
from navi.core import BackendRegistry, ProfileRegistry, SessionStore
from navi.core.name_generator import generate_session_name
from navi.exceptions import ProfileNotFound
from navi.memory import MemoryStore
from navi.session_files import delete_session_dir, ensure_session_dir, is_forbidden, safe_filename, session_dir
log = structlog.get_logger()
router = APIRouter(prefix="/sessions", tags=["sessions"])
# Deduplication lock for stale-session memory extraction — prevents task storm
# when sessions are created rapidly.
_stale_session_lock = asyncio.Lock()
class CreateSessionRequest(BaseModel):
profile_id: str
class PinSessionRequest(BaseModel):
pinned: bool
@router.post("", status_code=201)
async def create_session(
body: CreateSessionRequest,
store: Annotated[SessionStore, Depends(get_session_store)],
profiles: Annotated[ProfileRegistry, Depends(get_profile_registry)],
memory: Annotated[MemoryStore, Depends(get_memory_store)],
) -> dict:
try:
profiles.get(body.profile_id)
except ProfileNotFound:
raise HTTPException(status_code=404, detail=f"Profile '{body.profile_id}' not found")
session = await store.create(body.profile_id)
# Fire-and-forget: extract memory from any stale sessions (last active > 30 min ago)
async def _deduped_extract():
if _stale_session_lock.locked():
return
async with _stale_session_lock:
await _process_stale_sessions(store, memory)
asyncio.create_task(_deduped_extract())
return {
"session_id": session.id,
"profile_id": session.profile_id,
"created_at": session.created_at.isoformat(),
}
async def _process_stale_sessions(store: SessionStore, memory: MemoryStore) -> None:
"""Extract facts from sessions idle for >30 min that haven't been processed yet."""
from navi.api.deps import get_backend_registry
from navi.config import settings
from navi.memory.extractor import extract_and_update
cutoff = datetime.now(timezone.utc) - timedelta(minutes=30)
try:
sessions = await store.list_all()
backend = get_backend_registry().get("ollama")
except Exception:
return
for session in sessions:
if not session.messages:
continue
if session.last_active >= cutoff:
continue # still active
extracted_at = await memory.get_extracted_at(session.id)
if extracted_at and extracted_at >= session.last_active.isoformat():
continue # already up to date
try:
await extract_and_update(session, backend, settings.ollama_default_model, memory)
except Exception:
log.warning("memory.extraction_failed", session_id=session.id, exc_info=True)
@router.get("")
async def list_sessions(
store: Annotated[SessionStore, Depends(get_session_store)],
) -> list[dict]:
sessions = await store.list_all()
return [
{
"session_id": s.id,
"profile_id": s.profile_id,
"name": s.name,
"message_count": len(s.messages),
"preview": _preview(s),
"pinned": s.pinned,
"created_at": s.created_at.isoformat(),
"last_active": s.last_active.isoformat(),
}
for s in sessions
]
def _preview(session) -> str:
"""Return a short snippet from the last user or assistant message."""
for msg in reversed(session.messages):
if msg.role in ("user", "assistant") and msg.content:
return msg.content[:60]
return ""
@router.get("/{session_id}")
async def get_session(
session_id: str,
store: Annotated[SessionStore, Depends(get_session_store)],
) -> dict:
session = await store.get(session_id)
if session is None:
raise HTTPException(status_code=404, detail="Session not found")
return {
"session_id": session.id,
"profile_id": session.profile_id,
"name": session.name,
"messages": [m.model_dump(mode='json', exclude_none=True) for m in session.messages],
"context_token_count": session.context_token_count,
"max_context_tokens": settings.ollama_num_ctx,
"created_at": session.created_at.isoformat(),
"last_active": session.last_active.isoformat(),
}
@router.patch("/{session_id}/pin")
async def pin_session(
session_id: str,
body: PinSessionRequest,
store: Annotated[SessionStore, Depends(get_session_store)],
) -> dict:
ok = await store.set_pinned(session_id, body.pinned)
if not ok:
raise HTTPException(status_code=404, detail="Session not found")
return {"session_id": session_id, "pinned": body.pinned}
@router.get("/{session_id}/context")
async def get_session_context(
session_id: str,
store: Annotated[SessionStore, Depends(get_session_store)],
) -> dict:
"""Return the LLM context (what the model actually sees) for debugging."""
session = await store.get(session_id)
if session is None:
raise HTTPException(status_code=404, detail="Session not found")
total_chars = sum(len(m.content or "") for m in session.context)
return {
"session_id": session.id,
"profile_id": session.profile_id,
"message_count": len(session.context),
"total_chars": total_chars,
"context": [m.model_dump(mode="json", exclude_none=True) for m in session.context],
}
@router.get("/{session_id}/planning")
async def get_session_planning(
session_id: str,
store: Annotated[SessionStore, Depends(get_session_store)],
) -> dict:
"""Return all planning phase debug logs for a session."""
session = await store.get(session_id)
if session is None:
raise HTTPException(status_code=404, detail="Session not found")
return {"session_id": session.id, "logs": session.planning_logs}
@router.post("/{session_id}/files", status_code=201)
async def upload_file(
session_id: str,
file: UploadFile,
store: Annotated[SessionStore, Depends(get_session_store)],
) -> dict:
session = await store.get(session_id)
if session is None:
raise HTTPException(status_code=404, detail="Session not found")
name = safe_filename(file.filename or "upload")
if is_forbidden(name):
raise HTTPException(status_code=400, detail="Executable files are not allowed")
max_bytes = settings.session_files_max_size_mb * 1024 * 1024
dest = ensure_session_dir(session_id) / name
# Handle duplicate filenames
if dest.exists():
stem, suffix = dest.stem, dest.suffix
i = 1
while dest.exists():
dest = dest.with_name(f"{stem}_{i}{suffix}")
i += 1
# Collect chunks asynchronously, then write in a thread
chunks: list[bytes] = []
size = 0
while True:
chunk = await file.read(1024 * 1024) # 1 MB at a time
if not chunk:
break
size += len(chunk)
if size > max_bytes:
raise HTTPException(
status_code=413,
detail=f"File exceeds {settings.session_files_max_size_mb} MB limit",
)
chunks.append(chunk)
try:
data = b"".join(chunks)
await asyncio.to_thread(dest.write_bytes, data)
except Exception as exc:
await asyncio.to_thread(dest.unlink, True)
raise HTTPException(status_code=500, detail=f"Upload failed: {exc}") from exc
return {
"name": dest.name,
"size": size,
"path": str(dest),
"content_type": file.content_type or "application/octet-stream",
}
@router.get("/{session_id}/files/{filename}")
async def download_file(
session_id: str,
filename: str,
store: Annotated[SessionStore, Depends(get_session_store)],
) -> FileResponse:
"""Download a file from the session's file directory."""
session = await store.get(session_id)
if session is None:
raise HTTPException(status_code=404, detail="Session not found")
# Resolve and verify the file is within the session directory (no path traversal)
base = session_dir(session_id).resolve()
file_path = (base / filename).resolve()
try:
file_path.relative_to(base)
except ValueError:
raise HTTPException(status_code=403, detail="Access denied")
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="File not found")
# Detect content type; use inline disposition for browser-renderable types
content_type, _ = mimetypes.guess_type(file_path.name)
content_type = content_type or "application/octet-stream"
inline_types = {"image/", "text/html", "text/plain", "application/pdf"}
inline = any(content_type.startswith(t) for t in inline_types)
return FileResponse(
path=file_path,
filename=filename,
media_type=content_type,
content_disposition_type="inline" if inline else "attachment",
)
@router.post("/{session_id}/generate-name")
async def generate_name(
session_id: str,
store: Annotated[SessionStore, Depends(get_session_store)],
backends: Annotated[BackendRegistry, Depends(get_backend_registry)],
) -> dict:
session = await store.get(session_id)
if session is None:
raise HTTPException(status_code=404, detail="Session not found")
# Only generate if no name yet
if session.name:
return {"name": session.name}
user_messages = [
m.content for m in session.messages
if m.role == "user" and m.content
]
if not user_messages:
return {"name": None}
backend = backends.get("ollama")
name = await generate_session_name(user_messages, backend, settings.ollama_default_model)
if name:
await store.set_name(session_id, name)
return {"name": name}
@router.delete("/{session_id}", status_code=204)
async def delete_session(
session_id: str,
store: Annotated[SessionStore, Depends(get_session_store)],
) -> None:
deleted = await store.delete(session_id)
if not deleted:
raise HTTPException(status_code=404, detail="Session not found")
# Remove session file directory if it exists
await delete_session_dir(session_id)