Newer
Older
navi-1 / navi / session_files.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy on 24 Apr 2 KB Make server I/O non-blocking; update docs
"""Session-scoped file storage.

Session directories live as long as the session itself exists in the database.
They are removed only when the session is explicitly deleted (DELETE /sessions/{id})
or when an orphaned directory is found (session no longer in DB).
"""

import asyncio
import shutil
from pathlib import Path

import structlog

from navi.config import settings

log = structlog.get_logger()

# Extensions that must never be uploaded
_FORBIDDEN_EXTENSIONS = {
    ".exe", ".dll", ".so", ".dylib",
    ".sh", ".bash", ".zsh", ".fish",
    ".bat", ".cmd", ".ps1", ".vbs",
    ".com", ".msi", ".app", ".deb", ".rpm",
    ".bin", ".elf",
}


def session_dir(session_id: str) -> Path:
    return Path(settings.session_files_dir) / session_id


def ensure_session_dir(session_id: str) -> Path:
    d = session_dir(session_id)
    d.mkdir(parents=True, exist_ok=True)
    return d


def is_forbidden(filename: str) -> bool:
    return Path(filename).suffix.lower() in _FORBIDDEN_EXTENSIONS


def safe_filename(name: str) -> str:
    """Strip directory components and replace problematic chars."""
    name = Path(name).name  # strip any path components
    name = name.replace("/", "_").replace("\\", "_").replace("\x00", "_")
    return name or "upload"


async def delete_session_dir(session_id: str) -> None:
    d = session_dir(session_id)
    if d.exists():
        await asyncio.to_thread(shutil.rmtree, d)
        log.info("session_files.deleted", session_id=session_id)


async def cleanup_orphans(store) -> None:
    """Remove session file dirs whose session no longer exists in the database."""
    root = Path(settings.session_files_dir)
    if not root.exists():
        return

    entries = await asyncio.to_thread(lambda: list(root.iterdir()))
    for d in entries:
        if not d.is_dir():
            continue
        session_id = d.name
        try:
            session = await store.get(session_id)
        except Exception:
            continue

        if session is None:
            await asyncio.to_thread(shutil.rmtree, d)
            log.info("session_files.orphan_deleted", session_id=session_id)


async def cleanup_loop(store) -> None:
    """Background task: remove orphaned session file dirs every hour."""
    while True:
        try:
            await cleanup_orphans(store)
        except Exception:
            log.exception("session_files.cleanup_error")
        await asyncio.sleep(3600)