Newer
Older
navi-1 / navi / content_store.py
"""Content store — manages published content files for inline viewing.

Files are stored in navi/content/<uuid>/ and served via /content/<uuid>/filename.
Metadata is tracked in the database for lifecycle management.
"""

import asyncio
import shutil
import uuid
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import TYPE_CHECKING

import structlog

from navi.config import settings

if TYPE_CHECKING:
    import asyncpg

log = structlog.get_logger()

_CONTENT_DIR = Path(__file__).parent / "content"
_CONTENT_DIR.mkdir(exist_ok=True)

# Auto-detect content type from file extension
_EXT_TO_TYPE = {
    ".stl": "stl",
    ".html": "html",
    ".htm": "html",
    ".svg": "svg",
    ".pdf": "pdf",
    ".png": "image",
    ".jpg": "image",
    ".jpeg": "image",
    ".gif": "image",
    ".webp": "image",
    ".mp4": "video",
    ".webm": "video",
}


def _detect_content_type(filename: str) -> str:
    ext = Path(filename).suffix.lower()
    return _EXT_TO_TYPE.get(ext, "unknown")


async def _get_db_pool():
    """Get asyncpg pool from deps (lazy import to avoid circular)."""
    from navi.api.deps import _memory_store

    return await _memory_store._get_pool()


async def ensure_tables() -> None:
    """Create session_content table if missing."""
    try:
        pool = await _get_db_pool()
    except Exception:
        log.warning("content_store.no_db", exc_info=True)
        return
    async with pool.acquire() as conn:
        await conn.execute(
            """CREATE TABLE IF NOT EXISTS session_content (
                id          TEXT PRIMARY KEY,
                session_id  TEXT NOT NULL,
                filename    TEXT NOT NULL,
                content_type TEXT NOT NULL,
                title       TEXT,
                created_at  TIMESTAMPTZ NOT NULL
            )"""
        )
        await conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_session_content_session ON session_content (session_id)"
        )


async def publish(
    session_id: str,
    src_path: Path,
    filename: str | None = None,
    title: str | None = None,
    content_type: str | None = None,
) -> dict:
    """Copy a file into the content store and record metadata.

    Returns {"id": "uuid", "url": "...", "filename": "...", "content_type": "..."}
    """
    src = Path(src_path).expanduser().resolve()
    if not src.exists():
        raise FileNotFoundError(f"Source file not found: {src}")

    content_id = str(uuid.uuid4())[:8]
    dest_dir = _CONTENT_DIR / content_id
    dest_dir.mkdir(parents=True, exist_ok=True)

    dest_name = filename or src.name
    dest = dest_dir / dest_name

    # Avoid clobbering
    if dest.exists():
        stem = Path(dest_name).stem
        suffix = Path(dest_name).suffix
        i = 1
        while dest.exists():
            dest = dest_dir / f"{stem}_{i}{suffix}"
            i += 1

    await asyncio.to_thread(shutil.copy2, str(src), str(dest))

    detected_type = content_type or _detect_content_type(dest.name)

    # Record in DB
    try:
        pool = await _get_db_pool()
        async with pool.acquire() as conn:
            await conn.execute(
                """INSERT INTO session_content
                    (id, session_id, filename, content_type, title, created_at)
                   VALUES ($1, $2, $3, $4, $5, $6)""",
                content_id,
                session_id,
                dest.name,
                detected_type,
                title or dest.name,
                datetime.now(timezone.utc),
            )
    except Exception:
        log.warning("content_store.db_insert_failed", content_id=content_id, exc_info=True)
        # Clean up orphaned file so we don't leak untracked content on disk
        try:
            if dest.exists():
                dest.unlink()
            if dest_dir.exists():
                dest_dir.rmdir()
        except Exception:
            log.warning("content_store.cleanup_failed", content_id=content_id, exc_info=True)
        raise

    base_url = settings.public_url.rstrip("/")
    url = f"{base_url}/content/{content_id}/{dest.name}"

    log.info("content_store.published", content_id=content_id, type=detected_type, url=url)
    return {
        "id": content_id,
        "url": url,
        "filename": dest.name,
        "content_type": detected_type,
        "title": title or dest.name,
    }


async def list_for_session(session_id: str) -> list[dict]:
    """Return all content items for a session."""
    try:
        pool = await _get_db_pool()
    except Exception:
        return []
    async with pool.acquire() as conn:
        rows = await conn.fetch(
            "SELECT id, filename, content_type, title, created_at FROM session_content WHERE session_id=$1 ORDER BY created_at DESC",
            session_id,
        )
    return [
        {
            "id": r["id"],
            "filename": r["filename"],
            "content_type": r["content_type"],
            "title": r["title"],
            "created_at": r["created_at"].isoformat() if hasattr(r["created_at"], "isoformat") else str(r["created_at"]),
        }
        for r in rows
    ]


async def delete_content(content_id: str) -> bool:
    """Remove a content item from disk and DB."""
    dest_dir = _CONTENT_DIR / content_id
    try:
        if dest_dir.exists():
            await asyncio.to_thread(shutil.rmtree, str(dest_dir))
    except Exception:
        log.warning("content_store.rmtree_failed", content_id=content_id, exc_info=True)

    try:
        pool = await _get_db_pool()
        async with pool.acquire() as conn:
            result = await conn.execute("DELETE FROM session_content WHERE id=$1", content_id)
            return int(result.split()[1]) > 0
    except Exception:
        log.warning("content_store.db_delete_failed", content_id=content_id, exc_info=True)
        return False


async def cleanup_old(days: int = 30) -> int:
    """Delete content older than N days. Returns count deleted."""
    cutoff = datetime.now(timezone.utc) - timedelta(days=days)
    try:
        pool = await _get_db_pool()
    except Exception:
        return 0
    async with pool.acquire() as conn:
        rows = await conn.fetch(
            "SELECT id FROM session_content WHERE created_at < $1", cutoff
        )
    deleted = 0
    for r in rows:
        if await delete_content(r["id"]):
            deleted += 1
    return deleted