"""Database backup and restore helpers."""
import os
import shutil
import subprocess
from datetime import UTC, datetime
from pathlib import Path
from gnexus_creds.config import Settings, get_settings
def _backup_dir(settings: Settings | None = None) -> Path:
settings = settings or get_settings()
path = Path(settings.backup_dir)
path.mkdir(parents=True, exist_ok=True)
return path
def _normalize_pg_url(url: str) -> str:
"""Strip SQLAlchemy driver suffix so pg_dump/psql accept the URL."""
return url.replace("postgresql+psycopg://", "postgresql://")
def create_backup(settings: Settings | None = None) -> dict:
"""Create a database backup and return metadata."""
settings = settings or get_settings()
url = settings.database_url
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
backup_dir = _backup_dir(settings)
if url.startswith("sqlite"):
db_path = url.replace("sqlite:///", "").replace("sqlite://", "")
filename = f"backup_{timestamp}.db"
dest = backup_dir / filename
shutil.copy2(db_path, dest)
return {
"filename": filename,
"path": str(dest),
"size": dest.stat().st_size,
"created_at": datetime.now(UTC).isoformat(),
"engine": "sqlite",
}
if url.startswith("postgresql"):
filename = f"backup_{timestamp}.sql"
dest = backup_dir / filename
normalized = _normalize_pg_url(url)
result = subprocess.run(
["pg_dump", "--if-exists", "--clean", "--no-owner", normalized],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f"pg_dump failed: {result.stderr}")
dest.write_text(result.stdout, encoding="utf-8")
return {
"filename": filename,
"path": str(dest),
"size": dest.stat().st_size,
"created_at": datetime.now(UTC).isoformat(),
"engine": "postgresql",
}
raise RuntimeError(f"Unsupported database URL: {url}")
def list_backups(settings: Settings | None = None) -> list[dict]:
"""List available backup files."""
backup_dir = _backup_dir(settings)
backups = []
for path in sorted(backup_dir.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True):
if path.is_file() and path.name.startswith("backup_"):
backups.append(
{
"filename": path.name,
"path": str(path),
"size": path.stat().st_size,
"created_at": datetime.fromtimestamp(path.stat().st_mtime, UTC).isoformat(),
}
)
return backups
def restore_backup(file_path: str | Path, settings: Settings | None = None) -> dict:
"""Restore database from a backup file."""
settings = settings or get_settings()
url = settings.database_url
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"Backup file not found: {path}")
if url.startswith("sqlite"):
db_path = url.replace("sqlite:///", "").replace("sqlite://", "")
shutil.copy2(str(path), db_path)
return {"engine": "sqlite", "restored": db_path}
if url.startswith("postgresql"):
normalized = _normalize_pg_url(url)
result = subprocess.run(
["psql", normalized, "-f", str(path)],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f"psql failed: {result.stderr}")
return {"engine": "postgresql", "restored": normalized}
raise RuntimeError(f"Unsupported database URL: {url}")