"""SQLite persistence for resume / deduplication."""
import sqlite3
import time
from pathlib import Path
from typing import List, Optional, Set
class ResumeStorage:
"""Tracks which realty_ids have already been processed.
Schema:
processed_realties
- realty_id TEXT PRIMARY KEY
- ingested_at INTEGER (unix timestamp)
- job_id INTEGER (data_collector job_id if available)
"""
def __init__(self, db_path: str = "domria_resume.db"):
self.db_path = Path(db_path)
self._init_db()
def _init_db(self) -> None:
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS processed_realties (
realty_id TEXT PRIMARY KEY,
ingested_at INTEGER NOT NULL,
job_id INTEGER
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_ingested_at
ON processed_realties(ingested_at)
"""
)
conn.commit()
def is_processed(self, realty_id: str) -> bool:
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"SELECT 1 FROM processed_realties WHERE realty_id = ?",
(realty_id,),
).fetchone()
return row is not None
def mark_processed(
self, realty_id: str, job_id: Optional[int] = None
) -> None:
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT OR REPLACE INTO processed_realties (realty_id, ingested_at, job_id)
VALUES (?, ?, ?)
""",
(realty_id, int(time.time()), job_id),
)
conn.commit()
def get_all_ids(self) -> Set[str]:
with sqlite3.connect(self.db_path) as conn:
rows = conn.execute("SELECT realty_id FROM processed_realties").fetchall()
return {r[0] for r in rows}
def stats(self) -> dict:
with sqlite3.connect(self.db_path) as conn:
total = conn.execute(
"SELECT COUNT(*) FROM processed_realties"
).fetchone()[0]
today = conn.execute(
"SELECT COUNT(*) FROM processed_realties WHERE date(ingested_at, 'unixepoch') = date('now')"
).fetchone()[0]
return {"total": total, "today": today}