"""Session-scoped file storage.
Session directories live as long as the session itself exists in the database.
They are removed only when the session is explicitly deleted (DELETE /sessions/{id})
or when an orphaned directory is found (session no longer in DB).
"""
import asyncio
import shutil
from datetime import datetime, timezone
from pathlib import Path
import structlog
from navi.config import settings
log = structlog.get_logger()
# Extensions that must never be uploaded
_FORBIDDEN_EXTENSIONS = {
".exe", ".dll", ".so", ".dylib",
".sh", ".bash", ".zsh", ".fish",
".bat", ".cmd", ".ps1", ".vbs",
".com", ".msi", ".app", ".deb", ".rpm",
".bin", ".elf",
}
def session_dir(session_id: str) -> Path:
return Path(settings.session_files_dir) / session_id
def ensure_session_dir(session_id: str) -> Path:
d = session_dir(session_id)
d.mkdir(parents=True, exist_ok=True)
return d
def is_forbidden(filename: str) -> bool:
return Path(filename).suffix.lower() in _FORBIDDEN_EXTENSIONS
def safe_filename(name: str) -> str:
"""Strip directory components and replace problematic chars."""
name = Path(name).name # strip any path components
name = name.replace("/", "_").replace("\\", "_").replace("\x00", "_")
return name or "upload"
async def delete_session_dir(session_id: str) -> None:
d = session_dir(session_id)
if d.exists():
await asyncio.to_thread(shutil.rmtree, d)
log.info("session_files.deleted", session_id=session_id)
async def cleanup_orphans(store) -> None:
"""Remove session file dirs whose session no longer exists in the database."""
root = Path(settings.session_files_dir)
if not root.exists():
return
entries = await asyncio.to_thread(lambda: list(root.iterdir()))
for d in entries:
if not d.is_dir():
continue
session_id = d.name
try:
session = await store.get(session_id)
except Exception:
continue
if session is None:
await asyncio.to_thread(shutil.rmtree, d)
log.info("session_files.orphan_deleted", session_id=session_id)
def list_session_files(session_id: str, max_depth: int = 10) -> list[dict]:
"""Recursively list files and directories in the session directory.
Returns a flat list ordered by path, each item containing:
{ path, size, is_dir, modified_at }.
Hidden files (starting with '.') are included.
"""
base = session_dir(session_id)
if not base.exists():
return []
out = []
_scan_dir(base, base, out, current_depth=0, max_depth=max_depth)
return out
def _scan_dir(root: Path, current: Path, out: list, current_depth: int, max_depth: int) -> None:
if current_depth > max_depth:
return
try:
entries = sorted(current.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower()))
except (PermissionError, OSError):
return
for entry in entries:
try:
stat = entry.stat()
rel = entry.relative_to(root).as_posix()
out.append({
"path": rel,
"size": stat.st_size if entry.is_file() else 0,
"is_dir": entry.is_dir(),
"modified_at": datetime.fromtimestamp(stat.st_mtime, timezone.utc).isoformat(),
})
if entry.is_dir():
_scan_dir(root, entry, out, current_depth + 1, max_depth)
except (PermissionError, OSError):
continue
async def cleanup_loop(store) -> None:
"""Background task: remove orphaned session file dirs every hour."""
while True:
try:
await cleanup_orphans(store)
except Exception:
log.exception("session_files.cleanup_error")
await asyncio.sleep(3600)