Newer
Older
vmk-360-domria_parser / src / storage.py
"""SQLite persistence for resume / deduplication."""
import sqlite3
import time
from pathlib import Path
from typing import List, Optional, Set


class ResumeStorage:
    """Tracks which realty_ids have already been processed.

    Schema:
        processed_realties
            - realty_id  TEXT PRIMARY KEY
            - ingested_at INTEGER (unix timestamp)
            - job_id     INTEGER (data_collector job_id if available)
    """

    def __init__(self, db_path: str = "domria_resume.db"):
        self.db_path = Path(db_path)
        self._init_db()

    def _init_db(self) -> None:
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """
                CREATE TABLE IF NOT EXISTS processed_realties (
                    realty_id TEXT PRIMARY KEY,
                    ingested_at INTEGER NOT NULL,
                    job_id INTEGER
                )
                """
            )
            conn.execute(
                """
                CREATE INDEX IF NOT EXISTS idx_ingested_at
                ON processed_realties(ingested_at)
                """
            )
            conn.commit()

    def is_processed(self, realty_id: str) -> bool:
        with sqlite3.connect(self.db_path) as conn:
            row = conn.execute(
                "SELECT 1 FROM processed_realties WHERE realty_id = ?",
                (realty_id,),
            ).fetchone()
            return row is not None

    def mark_processed(
        self, realty_id: str, job_id: Optional[int] = None
    ) -> None:
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """
                INSERT OR REPLACE INTO processed_realties (realty_id, ingested_at, job_id)
                VALUES (?, ?, ?)
                """,
                (realty_id, int(time.time()), job_id),
            )
            conn.commit()

    def get_all_ids(self) -> Set[str]:
        with sqlite3.connect(self.db_path) as conn:
            rows = conn.execute("SELECT realty_id FROM processed_realties").fetchall()
            return {r[0] for r in rows}

    def stats(self) -> dict:
        with sqlite3.connect(self.db_path) as conn:
            total = conn.execute(
                "SELECT COUNT(*) FROM processed_realties"
            ).fetchone()[0]
            today = conn.execute(
                "SELECT COUNT(*) FROM processed_realties WHERE date(ingested_at, 'unixepoch') = date('now')"
            ).fetchone()[0]
            return {"total": total, "today": today}