"""Content store — tracks session files published for inline viewing.
Files live in the session directory (settings.SESSION_FILES_DIR/<session_id>/).
Publishing only registers metadata in the DB; the file itself is NOT copied.
The URL points to the existing /sessions/{id}/files/{filename} endpoint.
"""
import asyncio
import uuid
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import TYPE_CHECKING
import structlog
from navi.config import settings
from navi.session_files import ensure_session_dir, session_dir
if TYPE_CHECKING:
import asyncpg
log = structlog.get_logger()
_EXT_TO_TYPE = {
".stl": "stl",
".html": "html",
".htm": "html",
".svg": "svg",
".pdf": "pdf",
".png": "image",
".jpg": "image",
".jpeg": "image",
".gif": "image",
".webp": "image",
".mp4": "video",
".webm": "video",
}
def _detect_content_type(filename: str) -> str:
ext = Path(filename).suffix.lower()
return _EXT_TO_TYPE.get(ext, "unknown")
def _file_url(session_id: str, filename: str, *, download: bool = False) -> str:
base_url = settings.public_url.rstrip("/")
url = f"{base_url}/api/sessions/{session_id}/files/{filename}"
return f"{url}?download=1" if download else url
def _file_updated_at(session_id: str, filename: str, fallback: datetime) -> datetime:
path = session_dir(session_id) / filename
try:
stat = path.stat()
except OSError:
return fallback
return datetime.fromtimestamp(stat.st_mtime, timezone.utc)
def _existing_source_filename(session_id: str, source_filename: str | None) -> str | None:
if not source_filename:
return None
filename = Path(source_filename).name
path = session_dir(session_id) / filename
return filename if path.is_file() else None
def _row_get(row, key: str, default=None):
try:
return row[key]
except (KeyError, IndexError):
return default
async def _get_db_pool():
"""Get asyncpg pool from deps (lazy import to avoid circular)."""
from navi.api.deps import get_memory_store
store = get_memory_store()
return await store._get_pool()
async def ensure_tables() -> None:
"""Create session_content table if missing."""
try:
pool = await _get_db_pool()
except Exception:
log.warning("content_store.no_db", exc_info=True)
return
async with pool.acquire() as conn:
await conn.execute(
"""CREATE TABLE IF NOT EXISTS session_content (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
filename TEXT NOT NULL,
content_type TEXT NOT NULL,
title TEXT,
source_filename TEXT,
created_at TIMESTAMPTZ NOT NULL
)"""
)
await conn.execute("ALTER TABLE session_content ADD COLUMN IF NOT EXISTS source_filename TEXT")
await conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session_content_session ON session_content (session_id)"
)
await conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_session_content_file "
"ON session_content (session_id, filename)"
)
async def publish(
session_id: str,
filename: str,
title: str | None = None,
content_type: str | None = None,
source_filename: str | None = None,
) -> dict:
"""Register a file from the session directory for inline viewing.
The file must already exist in settings.SESSION_FILES_DIR/{session_id}/.
Returns {"id": "...", "url": "...", "filename": "...", "content_type": "..."}
"""
src = ensure_session_dir(session_id) / filename
if not src.exists():
raise FileNotFoundError(
f"File '{filename}' not found in session directory: {session_dir(session_id)}"
)
if not src.is_file():
raise IsADirectoryError(f"Path is a directory, not a file: {src}")
detected_type = content_type or _detect_content_type(filename)
source_filename = _existing_source_filename(session_id, source_filename)
safe_filename = filename.replace("/", "_").replace("\\", "_")
content_id = f"{session_id}_{safe_filename}"
try:
pool = await _get_db_pool()
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO session_content
(id, session_id, filename, content_type, title, source_filename, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (session_id, filename) DO UPDATE
SET content_type = EXCLUDED.content_type,
title = EXCLUDED.title,
source_filename = EXCLUDED.source_filename,
created_at = EXCLUDED.created_at""",
content_id,
session_id,
filename,
detected_type,
title or filename,
source_filename,
datetime.now(timezone.utc),
)
except Exception:
log.warning("content_store.db_upsert_failed", content_id=content_id, exc_info=True)
raise
url = _file_url(session_id, filename)
updated_at = _file_updated_at(session_id, filename, datetime.now(timezone.utc))
log.info(
"content_store.published",
content_id=content_id,
session_id=session_id,
filename=filename,
type=detected_type,
url=url,
)
info = {
"id": content_id,
"url": url,
"download_url": _file_url(session_id, filename, download=True),
"filename": filename,
"content_type": detected_type,
"title": title or filename,
"updated_at": updated_at.isoformat(),
}
if source_filename:
info["source_filename"] = source_filename
info["source_url"] = _file_url(session_id, source_filename)
return info
async def list_for_session(session_id: str) -> list[dict]:
"""Return all published content items for a session."""
try:
pool = await _get_db_pool()
except Exception:
return []
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, filename, content_type, title, source_filename, created_at "
"FROM session_content WHERE session_id=$1 ORDER BY created_at DESC",
session_id,
)
items = []
for r in rows:
created_at = r["created_at"]
if not isinstance(created_at, datetime):
try:
created_at = datetime.fromisoformat(str(created_at))
except ValueError:
created_at = datetime.now(timezone.utc)
updated_at = _file_updated_at(session_id, r["filename"], created_at)
source_filename = _existing_source_filename(session_id, _row_get(r, "source_filename"))
item = {
"id": r["id"],
"filename": r["filename"],
"content_type": r["content_type"],
"title": r["title"],
"url": _file_url(session_id, r["filename"]),
"download_url": _file_url(session_id, r["filename"], download=True),
"created_at": r["created_at"].isoformat() if hasattr(r["created_at"], "isoformat") else str(r["created_at"]),
"updated_at": updated_at.isoformat(),
}
if source_filename:
item["source_filename"] = source_filename
item["source_url"] = _file_url(session_id, source_filename)
items.append(item)
return items
async def delete_content(content_id: str) -> bool:
"""Remove a content item from DB (file on disk is NOT deleted — it lives in session_dir)."""
try:
pool = await _get_db_pool()
async with pool.acquire() as conn:
result = await conn.execute("DELETE FROM session_content WHERE id=$1", content_id)
return int(result.split()[1]) > 0
except Exception:
log.warning("content_store.db_delete_failed", content_id=content_id, exc_info=True)
return False
async def cleanup_old(days: int = 30) -> int:
"""Delete DB records older than N days (files on disk are managed by session lifecycle)."""
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
try:
pool = await _get_db_pool()
except Exception:
return 0
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM session_content WHERE created_at < $1", cutoff
)
deleted = int(result.split()[1])
log.info("content_store.cleanup", deleted=deleted, cutoff=cutoff)
return deleted