"""Per-session extraction tracking — avoids re-processing the same session."""
from datetime import datetime, timezone
class SessionStateMixin:
"""Session extraction state.
Expected on the composite class:
_get_pool() -> asyncpg.Pool
"""
async def mark_session_extracted(self, session_id: str) -> None:
now = datetime.now(timezone.utc)
pool = await self._get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO session_memory_state (session_id, extracted_at) VALUES ($1, $2)
ON CONFLICT(session_id) DO UPDATE SET extracted_at=EXCLUDED.extracted_at""",
session_id, now,
)
async def get_extracted_at(self, session_id: str) -> str | None:
pool = await self._get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT extracted_at FROM session_memory_state WHERE session_id=$1", session_id
)
if row is None:
return None
val = row["extracted_at"]
return val.isoformat() if hasattr(val, "isoformat") else str(val)