diff --git a/debug/eval/api.py b/debug/eval/api.py index 61336c6..e652bbc 100644 --- a/debug/eval/api.py +++ b/debug/eval/api.py @@ -4,12 +4,26 @@ from typing import Literal -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel, Field +from navi.api.deps import ( + get_backend_registry, + get_profile_registry, + get_session_store, +) from navi.config import settings from .db import EvalDB +from .judge import JUDGE_VERSION, RUBRIC_VERSION +from .runner import get_registry, start_run +from .schema import ( + RunRequest, + RunStatus, + SessionDetail, + SessionOverview, + StatsResponse, +) router = APIRouter(prefix="/eval", tags=["eval"]) @@ -30,6 +44,9 @@ return _db +# ── Feedback (Phase 1) ─────────────────────────────────────────────────── + + class FeedbackIn(BaseModel): session_id: str message_index: int = Field(ge=0) @@ -53,3 +70,94 @@ async def list_feedback(session_id: str) -> dict: db = _get_db() return {"feedback": await db.list_feedback(session_id)} + + +# ── Sessions overview / detail ─────────────────────────────────────────── + + +@router.get("/sessions", response_model=list[SessionOverview]) +async def list_sessions( + limit: int = Query(50, ge=1, le=500), + offset: int = Query(0, ge=0), + profile: str | None = None, + status: Literal["evaluated", "pending", "stale"] | None = None, +) -> list[SessionOverview]: + db = _get_db() + rows = await db.list_sessions_overview( + judge_version=JUDGE_VERSION, + rubric_version=RUBRIC_VERSION, + limit=limit, + offset=offset, + profile=profile, + status=status, + ) + return [SessionOverview.model_validate(r) for r in rows] + + +@router.get("/sessions/{session_id}", response_model=SessionDetail) +async def get_session_detail(session_id: str) -> SessionDetail: + db = _get_db() + session_store = get_session_store() + session = await session_store.get(session_id) + if session is None: + raise HTTPException(404, f"session not found: {session_id}") + + feedback = await db.list_feedback(session_id) + evaluations = await db.list_evaluations(session_id) + + return SessionDetail( + session_id=session.id, + profile_id=session.profile_id, + name=session.name, + created_at=session.created_at, + last_active=session.last_active, + msg_count=len(session.messages), + feedback=feedback, + evaluations=evaluations, + ) + + +# ── Stats ──────────────────────────────────────────────────────────────── + + +@router.get("/stats", response_model=StatsResponse) +async def get_stats( + days: int = Query(30, ge=1, le=365), + by_complexity_bucket: bool = False, +) -> StatsResponse: + db = _get_db() + raw = await db.aggregate_stats( + judge_version=JUDGE_VERSION, + rubric_version=RUBRIC_VERSION, + days=days, + by_complexity_bucket=by_complexity_bucket, + ) + return StatsResponse.model_validate(raw) + + +# ── Run trigger / status (background tasks) ───────────────────────────── + + +@router.post("/run", response_model=RunStatus) +async def trigger_run(req: RunRequest) -> RunStatus: + db = _get_db() + return start_run( + req=req, + db=db, + session_store=get_session_store(), + backend_registry=get_backend_registry(), + profile_registry=get_profile_registry(), + ) + + +@router.get("/run/{run_id}", response_model=RunStatus) +async def get_run_status(run_id: str) -> RunStatus: + status = get_registry().get(run_id) + if status is None: + raise HTTPException(404, f"run not found: {run_id}") + return status + + +@router.get("/runs", response_model=list[RunStatus]) +async def list_runs() -> list[RunStatus]: + return get_registry().list_runs() diff --git a/debug/eval/db.py b/debug/eval/db.py index a7a4d06..9854821 100644 --- a/debug/eval/db.py +++ b/debug/eval/db.py @@ -178,3 +178,289 @@ expected_experts, ) return {r["session_id"] for r in rows} + + # ── Aggregations for the read endpoints ──────────────────────────────── + + async def list_sessions_overview( + self, + *, + judge_version: str, + rubric_version: str, + limit: int = 50, + offset: int = 0, + profile: str | None = None, + status: str | None = None, # "evaluated" | "pending" | "stale" | None + ) -> list[dict]: + """One row per session with feedback counts and the latest eval summary. + + Joined in a single query against `sessions`, `message_feedback`, + `evaluations`. Status is derived against the current pinned versions. + """ + pool = await self._get_pool() + clauses: list[str] = [] + params: list = [] + + # judge_version / rubric_version are only used when filtering by status; + # otherwise we don't bind them as parameters (Postgres would error on + # unused-parameter type inference). + if status == "evaluated": + params.extend([judge_version, rubric_version]) + clauses.append( + f"latest.judge_version = ${len(params) - 1} " + f"AND latest.rubric_version = ${len(params)}" + ) + elif status == "pending": + clauses.append("latest.eval_run_id IS NULL") + elif status == "stale": + params.extend([judge_version, rubric_version]) + clauses.append( + "latest.eval_run_id IS NOT NULL AND " + f"(latest.judge_version <> ${len(params) - 1} " + f"OR latest.rubric_version <> ${len(params)})" + ) + + if profile is not None: + params.append(profile) + clauses.append(f"s.profile_id = ${len(params)}") + + where = ("WHERE " + " AND ".join(clauses)) if clauses else "" + params.extend([limit, offset]) + + sql = f""" + WITH latest_run AS ( + SELECT DISTINCT ON (session_id) + session_id, eval_run_id, eval_date, judge_version, rubric_version + FROM evaluations + ORDER BY session_id, eval_date DESC + ), + run_avg AS ( + SELECT e.eval_run_id, + AVG((e.scores->>'task_complexity')::numeric) AS task_complexity, + AVG((e.scores->>'goal_completion')::numeric) AS goal_completion, + AVG((e.scores->>'tool_usage_quality')::numeric) AS tool_usage_quality, + AVG((e.scores->>'efficiency')::numeric) AS efficiency, + AVG((e.scores->>'communication')::numeric) AS communication, + AVG(NULLIF(e.scores->>'subagent_orchestration','null')::numeric) AS subagent_orchestration, + AVG(NULLIF(e.scores->>'self_extension','null')::numeric) AS self_extension, + COUNT(*) AS expert_count + FROM evaluations e + JOIN latest_run lr ON lr.eval_run_id = e.eval_run_id + GROUP BY e.eval_run_id + ), + feedback_counts AS ( + SELECT session_id, + SUM(CASE WHEN rating = 1 THEN 1 ELSE 0 END)::int AS likes, + SUM(CASE WHEN rating = -1 THEN 1 ELSE 0 END)::int AS dislikes + FROM message_feedback + GROUP BY session_id + ) + SELECT + s.id, s.profile_id, s.name, s.created_at, s.last_active, s.pinned, + COALESCE(jsonb_array_length(s.messages::jsonb), 0) AS msg_count, + COALESCE(fc.likes, 0) AS likes, + COALESCE(fc.dislikes, 0) AS dislikes, + latest.eval_run_id AS latest_eval_run_id, + latest.eval_date AS latest_eval_date, + latest.judge_version AS latest_judge_version, + latest.rubric_version AS latest_rubric_version, + ra.task_complexity, ra.goal_completion, ra.tool_usage_quality, + ra.efficiency, ra.communication, + ra.subagent_orchestration, ra.self_extension, + ra.expert_count + FROM sessions s + LEFT JOIN feedback_counts fc ON fc.session_id = s.id + LEFT JOIN latest_run latest ON latest.session_id = s.id + LEFT JOIN run_avg ra ON ra.eval_run_id = latest.eval_run_id + {where} + ORDER BY s.pinned DESC, s.last_active DESC + LIMIT ${len(params) - 1} OFFSET ${len(params)} + """ + + async with pool.acquire() as conn: + rows = await conn.fetch(sql, *params) + + result = [] + for r in rows: + if r["latest_eval_run_id"] is None: + eval_status = "pending" + elif ( + r["latest_judge_version"] == judge_version + and r["latest_rubric_version"] == rubric_version + ): + eval_status = "evaluated" + else: + eval_status = "stale" + + avg = None + if r["latest_eval_run_id"] is not None: + avg = { + "task_complexity": _round_or_none(r["task_complexity"]), + "goal_completion": _round_or_none(r["goal_completion"]), + "tool_usage_quality": _round_or_none(r["tool_usage_quality"]), + "efficiency": _round_or_none(r["efficiency"]), + "communication": _round_or_none(r["communication"]), + "subagent_orchestration": _round_or_none(r["subagent_orchestration"]), + "self_extension": _round_or_none(r["self_extension"]), + } + + result.append( + { + "session_id": r["id"], + "profile_id": r["profile_id"], + "name": r["name"], + "created_at": r["created_at"], + "last_active": r["last_active"], + "pinned": r["pinned"], + "msg_count": r["msg_count"], + "likes": r["likes"], + "dislikes": r["dislikes"], + "eval_status": eval_status, + "latest_avg": avg, + "latest_eval_date": r["latest_eval_date"], + "latest_judge_version": r["latest_judge_version"], + "latest_rubric_version": r["latest_rubric_version"], + } + ) + return result + + async def aggregate_stats( + self, + *, + judge_version: str, + rubric_version: str, + days: int = 30, + by_complexity_bucket: bool = False, + ) -> dict: + """Weekly rolling per-axis averages over the last `days` days. + + Returns: + { + buckets: [bucket_label, ...], # ["overall"] or ["0-25", ...] + weekly: [ + {week_start, bucket, axis_means: {...}, sample_count} + ] + } + Bucket is computed from task_complexity for each session's latest run. + """ + pool = await self._get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + WITH latest_run AS ( + SELECT DISTINCT ON (session_id) + session_id, eval_run_id, eval_date + FROM evaluations + WHERE judge_version = $1 AND rubric_version = $2 + AND eval_date >= now() - ($3::text || ' days')::interval + ORDER BY session_id, eval_date DESC + ) + SELECT + date_trunc('week', lr.eval_date) AS week_start, + AVG((e.scores->>'task_complexity')::numeric) AS task_complexity, + AVG((e.scores->>'goal_completion')::numeric) AS goal_completion, + AVG((e.scores->>'tool_usage_quality')::numeric) AS tool_usage_quality, + AVG((e.scores->>'efficiency')::numeric) AS efficiency, + AVG((e.scores->>'communication')::numeric) AS communication, + AVG(NULLIF(e.scores->>'subagent_orchestration','null')::numeric) AS subagent_orchestration, + AVG(NULLIF(e.scores->>'self_extension','null')::numeric) AS self_extension, + COUNT(DISTINCT lr.session_id) AS sample_count + FROM evaluations e + JOIN latest_run lr ON lr.eval_run_id = e.eval_run_id + GROUP BY week_start + ORDER BY week_start + """, + judge_version, + rubric_version, + str(days), + ) + + weekly = [ + { + "week_start": r["week_start"].isoformat(), + "bucket": "overall", + "sample_count": r["sample_count"], + "axis_means": { + "task_complexity": _round_or_none(r["task_complexity"]), + "goal_completion": _round_or_none(r["goal_completion"]), + "tool_usage_quality": _round_or_none(r["tool_usage_quality"]), + "efficiency": _round_or_none(r["efficiency"]), + "communication": _round_or_none(r["communication"]), + "subagent_orchestration": _round_or_none(r["subagent_orchestration"]), + "self_extension": _round_or_none(r["self_extension"]), + }, + } + for r in rows + ] + + # Optional bucket split — second query, grouped by complexity bucket. + if by_complexity_bucket: + async with pool.acquire() as conn: + bucket_rows = await conn.fetch( + """ + WITH latest_run AS ( + SELECT DISTINCT ON (session_id) + session_id, eval_run_id, eval_date + FROM evaluations + WHERE judge_version = $1 AND rubric_version = $2 + AND eval_date >= now() - ($3::text || ' days')::interval + ORDER BY session_id, eval_date DESC + ), + run_avg AS ( + SELECT + lr.session_id, + date_trunc('week', lr.eval_date) AS week_start, + AVG((e.scores->>'task_complexity')::numeric) AS task_complexity, + AVG((e.scores->>'goal_completion')::numeric) AS goal_completion, + AVG((e.scores->>'tool_usage_quality')::numeric) AS tool_usage_quality, + AVG((e.scores->>'efficiency')::numeric) AS efficiency, + AVG((e.scores->>'communication')::numeric) AS communication + FROM evaluations e + JOIN latest_run lr ON lr.eval_run_id = e.eval_run_id + GROUP BY lr.session_id, week_start + ) + SELECT + week_start, + CASE + WHEN task_complexity <= 25 THEN '0-25' + WHEN task_complexity <= 50 THEN '26-50' + WHEN task_complexity <= 75 THEN '51-75' + ELSE '76+' + END AS bucket, + AVG(task_complexity) AS task_complexity, + AVG(goal_completion) AS goal_completion, + AVG(tool_usage_quality) AS tool_usage_quality, + AVG(efficiency) AS efficiency, + AVG(communication) AS communication, + COUNT(*) AS sample_count + FROM run_avg + GROUP BY week_start, bucket + ORDER BY week_start, bucket + """, + judge_version, + rubric_version, + str(days), + ) + weekly.extend( + { + "week_start": r["week_start"].isoformat(), + "bucket": r["bucket"], + "sample_count": r["sample_count"], + "axis_means": { + "task_complexity": _round_or_none(r["task_complexity"]), + "goal_completion": _round_or_none(r["goal_completion"]), + "tool_usage_quality": _round_or_none(r["tool_usage_quality"]), + "efficiency": _round_or_none(r["efficiency"]), + "communication": _round_or_none(r["communication"]), + }, + } + for r in bucket_rows + ) + + buckets = ["0-25", "26-50", "51-75", "76+"] if by_complexity_bucket else ["overall"] + return {"buckets": buckets, "weekly": weekly} + + +def _round_or_none(v) -> int | None: + if v is None: + return None + return round(float(v)) diff --git a/debug/eval/runner.py b/debug/eval/runner.py new file mode 100644 index 0000000..4c6252e --- /dev/null +++ b/debug/eval/runner.py @@ -0,0 +1,183 @@ +"""Background eval runner used by both the CLI and the REST endpoint. + +Owns an in-memory registry of in-flight runs keyed by run_id (uuid). The +caller starts a run via `start_run(...)`, then polls `get_run(run_id)` for +status. Runs persist their results to postgres via EvalDB; the in-memory +registry is purely for live-progress reporting. +""" + +from __future__ import annotations + +import asyncio +import traceback +from datetime import datetime, timezone +from uuid import uuid4 + +from .db import EvalDB +from .judge import ( + JUDGE_VERSION, + RUBRIC_VERSION, + average_scores, + evaluate_session, +) +from .schema import RunRequest, RunSessionStatus, RunStatus + + +class _RunRegistry: + """Single-process in-memory registry. Cleared on server restart.""" + + def __init__(self) -> None: + self._runs: dict[str, RunStatus] = {} + self._tasks: dict[str, asyncio.Task] = {} + + def register(self, status: RunStatus) -> None: + self._runs[status.run_id] = status + + def get(self, run_id: str) -> RunStatus | None: + return self._runs.get(run_id) + + def list_runs(self) -> list[RunStatus]: + return sorted(self._runs.values(), key=lambda r: r.started_at, reverse=True) + + def attach_task(self, run_id: str, task: asyncio.Task) -> None: + self._tasks[run_id] = task + + +_registry = _RunRegistry() + + +def get_registry() -> _RunRegistry: + return _registry + + +# ── Picking sessions ──────────────────────────────────────────────────── + + +async def _resolve_sessions(req: RunRequest, db: EvalDB, session_store): + if req.scope == "session": + if not req.session_id: + raise ValueError("scope=session requires session_id") + s = await session_store.get(req.session_id) + if s is None: + raise ValueError(f"session not found: {req.session_id}") + return [s] + + sessions = await session_store.list_all() + + if req.scope == "unevaluated": + already = await db.evaluated_session_ids(JUDGE_VERSION, RUBRIC_VERSION) + sessions = [s for s in sessions if s.id not in already] + + if req.since is not None: + sessions = [s for s in sessions if s.created_at >= req.since] + if req.limit is not None: + sessions = sessions[: req.limit] + return sessions + + +# ── The actual loop ───────────────────────────────────────────────────── + + +async def _run_loop( + *, + run_id: str, + req: RunRequest, + db: EvalDB, + session_store, + backend_registry, + profile_registry, +) -> None: + status = _registry.get(run_id) + if status is None: + return + + try: + sessions = await _resolve_sessions(req, db, session_store) + status.sessions = [ + RunSessionStatus(session_id=s.id, state="pending") for s in sessions + ] + except Exception as e: + status.state = "failed" + status.finished_at = datetime.now(timezone.utc) + status.sessions = [RunSessionStatus(session_id="", state="failed", error=str(e))] + return + + try: + llm = backend_registry.get(req.backend) + except Exception as e: + status.state = "failed" + status.finished_at = datetime.now(timezone.utc) + for s in status.sessions: + s.state = "failed" + s.error = f"backend not available: {e}" + return + + by_id = {s.id: s for s in sessions} + for entry in status.sessions: + session = by_id.get(entry.session_id) + if session is None: + entry.state = "failed" + entry.error = "session vanished mid-run" + continue + entry.state = "running" + try: + feedback = await db.feedback_by_index(session.id) + try: + profile = profile_registry.get(session.profile_id) + except Exception: + profile = None + metadata, results = await evaluate_session( + session=session, + feedback_by_index=feedback, + profile=profile, + llm=llm, + model=req.model, + ) + await db.insert_evaluation_run(metadata, session.id, results) + entry.avg = average_scores(results).model_dump() + entry.state = "ok" + except Exception as e: + entry.state = "failed" + entry.error = f"{type(e).__name__}: {e}" + traceback.print_exc() + + status.state = "done" + status.finished_at = datetime.now(timezone.utc) + + +# ── Public entry ──────────────────────────────────────────────────────── + + +def start_run( + *, + req: RunRequest, + db: EvalDB, + session_store, + backend_registry, + profile_registry, +) -> RunStatus: + """Kick off a run in the background. Returns the initial RunStatus.""" + run_id = str(uuid4()) + status = RunStatus( + run_id=run_id, + state="running", + started_at=datetime.now(timezone.utc), + finished_at=None, + judge_model=req.model, + judge_version=JUDGE_VERSION, + rubric_version=RUBRIC_VERSION, + sessions=[], + ) + _registry.register(status) + task = asyncio.create_task( + _run_loop( + run_id=run_id, + req=req, + db=db, + session_store=session_store, + backend_registry=backend_registry, + profile_registry=profile_registry, + ) + ) + _registry.attach_task(run_id, task) + return status diff --git a/debug/eval/schema.py b/debug/eval/schema.py index a7a56be..63a2396 100644 --- a/debug/eval/schema.py +++ b/debug/eval/schema.py @@ -80,3 +80,84 @@ expert_id: str scores: EvalScores comment: str + + +# ── REST response shapes ───────────────────────────────────────────────── + + +class SessionOverview(BaseModel): + """Row in GET /eval/sessions.""" + + session_id: str + profile_id: str + name: str | None = None + created_at: datetime + last_active: datetime + pinned: bool + msg_count: int + likes: int + dislikes: int + eval_status: Literal["evaluated", "pending", "stale"] + latest_avg: dict | None = None + latest_eval_date: datetime | None = None + latest_judge_version: str | None = None + latest_rubric_version: str | None = None + + +class SessionDetail(BaseModel): + """GET /eval/sessions/{id}.""" + + session_id: str + profile_id: str + name: str | None = None + created_at: datetime + last_active: datetime + msg_count: int + feedback: list[dict] # [{message_index, rating, ...}] + evaluations: list[StoredEvaluation] # newest first + + +class WeeklyAxisMeans(BaseModel): + week_start: str + bucket: str + sample_count: int + axis_means: dict + + +class StatsResponse(BaseModel): + buckets: list[str] + weekly: list[WeeklyAxisMeans] + + +# ── Run trigger / status ───────────────────────────────────────────────── + + +class RunRequest(BaseModel): + """POST /eval/run body.""" + + scope: Literal["unevaluated", "session", "all"] = "unevaluated" + session_id: str | None = None + since: datetime | None = None + limit: int | None = None + model: str = "gemma4:31b-cloud" + backend: str = "ollama" + + +class RunSessionStatus(BaseModel): + session_id: str + state: Literal["pending", "running", "ok", "failed"] + avg: dict | None = None + error: str | None = None + + +class RunStatus(BaseModel): + """GET /eval/run/{run_id}.""" + + run_id: str + state: Literal["running", "done", "failed"] + started_at: datetime + finished_at: datetime | None = None + judge_model: str + judge_version: str + rubric_version: str + sessions: list[RunSessionStatus]