"""asyncpg helpers for the eval system.
Phase 1 surface: message feedback (like / dislike / clear). Phase 2 will extend
this module with evaluation queries.
Schema is applied lazily on the first pool acquire — same pattern as
navi/core/pg_session_store.py. The full DDL lives in schema.sql alongside.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
import asyncpg
_SCHEMA_PATH = Path(__file__).parent / "schema.sql"
class EvalDB:
"""Owns its own asyncpg pool. Reuses settings.database_url."""
def __init__(self, dsn: str) -> None:
self._dsn = dsn
self._pool: asyncpg.Pool | None = None
self._lock = asyncio.Lock()
async def _get_pool(self) -> asyncpg.Pool:
if self._pool is not None:
return self._pool
async with self._lock:
if self._pool is None:
pool = await asyncpg.create_pool(self._dsn)
async with pool.acquire() as conn:
await conn.execute(_SCHEMA_PATH.read_text(encoding="utf-8"))
self._pool = pool
return self._pool
# ── Feedback ──────────────────────────────────────────────────────────
async def set_feedback(
self, session_id: str, message_index: int, rating: int
) -> None:
if rating not in (-1, 1):
raise ValueError("rating must be -1 or 1")
pool = await self._get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO message_feedback (session_id, message_index, rating)
VALUES ($1, $2, $3)
ON CONFLICT (session_id, message_index)
DO UPDATE SET rating = EXCLUDED.rating, updated_at = now()
""",
session_id, message_index, rating,
)
async def clear_feedback(self, session_id: str, message_index: int) -> None:
pool = await self._get_pool()
async with pool.acquire() as conn:
await conn.execute(
"DELETE FROM message_feedback WHERE session_id = $1 AND message_index = $2",
session_id, message_index,
)
async def list_feedback(self, session_id: str) -> list[dict]:
pool = await self._get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT message_index, rating, created_at, updated_at "
"FROM message_feedback WHERE session_id = $1 ORDER BY message_index",
session_id,
)
return [
{
"message_index": r["message_index"],
"rating": r["rating"],
"created_at": r["created_at"].isoformat(),
"updated_at": r["updated_at"].isoformat(),
}
for r in rows
]