"""Content store — manages published content files for inline viewing.
Files are stored in navi/content/<uuid>/ and served via /content/<uuid>/filename.
Metadata is tracked in the database for lifecycle management.
"""
import asyncio
import shutil
import uuid
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import TYPE_CHECKING
import structlog
from navi.config import settings
if TYPE_CHECKING:
import asyncpg
log = structlog.get_logger()
_CONTENT_DIR = Path(__file__).parent / "content"
_CONTENT_DIR.mkdir(exist_ok=True)
# Auto-detect content type from file extension
_EXT_TO_TYPE = {
".stl": "stl",
".html": "html",
".htm": "html",
".svg": "svg",
".pdf": "pdf",
".png": "image",
".jpg": "image",
".jpeg": "image",
".gif": "image",
".webp": "image",
".mp4": "video",
".webm": "video",
}
def _detect_content_type(filename: str) -> str:
ext = Path(filename).suffix.lower()
return _EXT_TO_TYPE.get(ext, "unknown")
async def _get_db_pool():
"""Get asyncpg pool from deps (lazy import to avoid circular)."""
from navi.api.deps import _memory_store
return await _memory_store._get_pool()
async def ensure_tables() -> None:
"""Create session_content table if missing."""
try:
pool = await _get_db_pool()
except Exception:
log.warning("content_store.no_db", exc_info=True)
return
async with pool.acquire() as conn:
await conn.execute(
"""CREATE TABLE IF NOT EXISTS session_content (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
filename TEXT NOT NULL,
content_type TEXT NOT NULL,
title TEXT,
created_at TIMESTAMPTZ NOT NULL
)"""
)
await conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session_content_session ON session_content (session_id)"
)
async def publish(
session_id: str,
src_path: Path,
filename: str | None = None,
title: str | None = None,
content_type: str | None = None,
) -> dict:
"""Copy a file into the content store and record metadata.
Returns {"id": "uuid", "url": "...", "filename": "...", "content_type": "..."}
"""
src = Path(src_path).expanduser().resolve()
if not src.exists():
raise FileNotFoundError(f"Source file not found: {src}")
content_id = str(uuid.uuid4())[:8]
dest_dir = _CONTENT_DIR / content_id
dest_dir.mkdir(parents=True, exist_ok=True)
dest_name = filename or src.name
dest = dest_dir / dest_name
# Avoid clobbering
if dest.exists():
stem = Path(dest_name).stem
suffix = Path(dest_name).suffix
i = 1
while dest.exists():
dest = dest_dir / f"{stem}_{i}{suffix}"
i += 1
await asyncio.to_thread(shutil.copy2, str(src), str(dest))
detected_type = content_type or _detect_content_type(dest.name)
# Record in DB
try:
pool = await _get_db_pool()
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO session_content
(id, session_id, filename, content_type, title, created_at)
VALUES ($1, $2, $3, $4, $5, $6)""",
content_id,
session_id,
dest.name,
detected_type,
title or dest.name,
datetime.now(timezone.utc),
)
except Exception:
log.warning("content_store.db_insert_failed", content_id=content_id, exc_info=True)
# Clean up orphaned file so we don't leak untracked content on disk
try:
if dest.exists():
dest.unlink()
if dest_dir.exists():
dest_dir.rmdir()
except Exception:
log.warning("content_store.cleanup_failed", content_id=content_id, exc_info=True)
raise
base_url = settings.public_url.rstrip("/")
url = f"{base_url}/content/{content_id}/{dest.name}"
log.info("content_store.published", content_id=content_id, type=detected_type, url=url)
return {
"id": content_id,
"url": url,
"filename": dest.name,
"content_type": detected_type,
"title": title or dest.name,
}
async def list_for_session(session_id: str) -> list[dict]:
"""Return all content items for a session."""
try:
pool = await _get_db_pool()
except Exception:
return []
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, filename, content_type, title, created_at FROM session_content WHERE session_id=$1 ORDER BY created_at DESC",
session_id,
)
return [
{
"id": r["id"],
"filename": r["filename"],
"content_type": r["content_type"],
"title": r["title"],
"created_at": r["created_at"].isoformat() if hasattr(r["created_at"], "isoformat") else str(r["created_at"]),
}
for r in rows
]
async def delete_content(content_id: str) -> bool:
"""Remove a content item from disk and DB."""
dest_dir = _CONTENT_DIR / content_id
try:
if dest_dir.exists():
await asyncio.to_thread(shutil.rmtree, str(dest_dir))
except Exception:
log.warning("content_store.rmtree_failed", content_id=content_id, exc_info=True)
try:
pool = await _get_db_pool()
async with pool.acquire() as conn:
result = await conn.execute("DELETE FROM session_content WHERE id=$1", content_id)
return int(result.split()[1]) > 0
except Exception:
log.warning("content_store.db_delete_failed", content_id=content_id, exc_info=True)
return False
async def cleanup_old(days: int = 30) -> int:
"""Delete content older than N days. Returns count deleted."""
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
try:
pool = await _get_db_pool()
except Exception:
return 0
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id FROM session_content WHERE created_at < $1", cutoff
)
deleted = 0
for r in rows:
if await delete_content(r["id"]):
deleted += 1
return deleted