diff --git a/debug/eval/cli.py b/debug/eval/cli.py index e28329f..8b1b73f 100644 --- a/debug/eval/cli.py +++ b/debug/eval/cli.py @@ -2,13 +2,11 @@ Invoke from the project root: - python -m debug.eval run [--since DATE] [--limit N] [--re-evaluate-all] + python -m debug.eval run [--session ID] [--since DATE] [--limit N] [--re-evaluate-all] [--dry-run] [--model MODEL] python -m debug.eval show - python -m debug.eval stats [--days 30] [--csv] + python -m debug.eval stats [--days 30] [--csv] # Phase 4 -Phase 2 lands the argparse skeleton with command stubs. The real work -(transcript rendering, judge calls, score persistence, stats aggregation) -lands in Phase 3 and Phase 4 — see docs/eval_system.md. +Phase 3 implements `run` and `show`. Stats lands in Phase 4. """ from __future__ import annotations @@ -16,8 +14,26 @@ import argparse import asyncio import sys +import traceback +from datetime import datetime, timezone from typing import Sequence +from navi.config import settings + +from .db import EvalDB +from .judge import ( + EXPERT_IDS, + JUDGE_VERSION, + RUBRIC_VERSION, + average_scores, + evaluate_session, +) + + +# Default judge model. The fallback backend will probe the priority list and +# pick the first one available, so passing a list also works. +DEFAULT_JUDGE_MODEL = "gemma4:31b-cloud" + def _build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( @@ -40,6 +56,16 @@ action="store_true", help="List what would be evaluated, do not call the judge.", ) + p_run.add_argument( + "--model", + default=DEFAULT_JUDGE_MODEL, + help=f"Judge model (default: {DEFAULT_JUDGE_MODEL}). Repeat the flag to set a fallback list.", + ) + p_run.add_argument( + "--backend", + default="ollama", + help="Backend key from BackendRegistry (default: ollama).", + ) p_show = sub.add_parser("show", help="Print stored evaluations for a session.") p_show.add_argument("session_id") @@ -56,27 +82,149 @@ return p -# ── Command stubs ──────────────────────────────────────────────────────── +# ── Helpers ────────────────────────────────────────────────────────────── + + +def _parse_since(value: str | None) -> datetime | None: + if not value: + return None + dt = datetime.fromisoformat(value) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + + +def _ensure_db_url() -> None: + if not settings.database_url: + print("DATABASE_URL not set — eval system requires postgres.", file=sys.stderr) + sys.exit(1) + + +# ── Commands ───────────────────────────────────────────────────────────── async def _cmd_run(args: argparse.Namespace) -> int: - print("[eval] run command — not implemented yet (lands in Phase 3).", file=sys.stderr) - print(f"[eval] would process: session={args.session} since={args.since} " - f"limit={args.limit} re_eval_all={args.re_evaluate_all} dry={args.dry_run}", - file=sys.stderr) - return 2 + _ensure_db_url() + + # Lazy imports — these instantiate session_store, registries, pools. + from navi.api.deps import get_backend_registry, get_profile_registry, get_session_store + + db = EvalDB(settings.database_url) + session_store = get_session_store() + backend_registry = get_backend_registry() + profile_registry = get_profile_registry() + + # Pick sessions + if args.session: + s = await session_store.get(args.session) + if s is None: + print(f"Session not found: {args.session}", file=sys.stderr) + return 1 + sessions = [s] + else: + sessions = await session_store.list_all() + if not args.re_evaluate_all: + already = await db.evaluated_session_ids(JUDGE_VERSION, RUBRIC_VERSION) + sessions = [s for s in sessions if s.id not in already] + since_dt = _parse_since(args.since) + if since_dt is not None: + sessions = [s for s in sessions if s.created_at >= since_dt] + if args.limit is not None: + sessions = sessions[: args.limit] + + if not sessions: + print("Nothing to evaluate.") + return 0 + + if args.dry_run: + print(f"Would evaluate {len(sessions)} session(s):") + for s in sessions: + print(f" {s.id} profile={s.profile_id} msgs={len(s.messages)} started={s.created_at.isoformat()}") + return 0 + + # Resolve LLM backend + try: + llm = backend_registry.get(args.backend) + except Exception as e: + print(f"Failed to resolve backend {args.backend!r}: {e}", file=sys.stderr) + return 1 + + print( + f"Evaluating {len(sessions)} session(s) " + f"with judge_model={args.model} judge_version={JUDGE_VERSION} rubric_version={RUBRIC_VERSION}" + ) + + failures = 0 + for i, session in enumerate(sessions, 1): + prefix = f"[{i}/{len(sessions)}] {session.id} ({session.profile_id}, {len(session.messages)} msgs)" + print(prefix) + try: + feedback = await db.feedback_by_index(session.id) + profile = profile_registry.get(session.profile_id) + metadata, results = await evaluate_session( + session=session, + feedback_by_index=feedback, + profile=profile, + llm=llm, + model=args.model, + ) + await db.insert_evaluation_run(metadata, session.id, results) + avg = average_scores(results) + print(f" -> avg: {avg.model_dump()}") + for r in results: + print(f" {r.expert_id}: {r.scores.model_dump()}") + except Exception as e: + failures += 1 + print(f" ERROR: {e}", file=sys.stderr) + traceback.print_exc() + + print(f"\nDone. ok={len(sessions) - failures} failed={failures}") + return 0 if failures == 0 else 1 async def _cmd_show(args: argparse.Namespace) -> int: - print(f"[eval] show command — not implemented yet (lands in Phase 3). session={args.session_id}", - file=sys.stderr) - return 2 + _ensure_db_url() + + db = EvalDB(settings.database_url) + rows = await db.list_evaluations(args.session_id) + if not rows: + print("No evaluations found.") + return 0 + + # Group by run_id, preserving the (already DESC) date order + runs: dict[str, list] = {} + for r in rows: + runs.setdefault(str(r.eval_run_id), []).append(r) + + for run_id, run_rows in runs.items(): + head = run_rows[0] + print() + print(f"=== Run {run_id} ({head.eval_date.isoformat()}) ===") + print( + f" Judge: {head.judge_model} ({head.judge_version}) " + f"Rubric: {head.rubric_version}" + ) + for r in run_rows: + print(f"\n [{r.expert_id}]") + for axis, score in r.scores.model_dump().items(): + print(f" {axis:>26}: {score}") + print(f" comment: {r.comment}") + if len(run_rows) >= len(EXPERT_IDS): + avg = average_scores(run_rows) + print("\n AVERAGE") + for axis, score in avg.model_dump().items(): + print(f" {axis:>26}: {score}") + + return 0 async def _cmd_stats(args: argparse.Namespace) -> int: - print(f"[eval] stats command — not implemented yet (lands in Phase 4). " - f"days={args.days} csv={args.csv} by_bucket={args.by_complexity_bucket}", - file=sys.stderr) + print("[eval] stats command — lands in Phase 4.", file=sys.stderr) + print( + f"[eval] would aggregate: days={args.days} csv={args.csv} " + f"by_bucket={args.by_complexity_bucket}", + file=sys.stderr, + ) return 2 diff --git a/debug/eval/db.py b/debug/eval/db.py index d13aadf..a7a4d06 100644 --- a/debug/eval/db.py +++ b/debug/eval/db.py @@ -1,7 +1,7 @@ """asyncpg helpers for the eval system. -Phase 1 surface: message feedback (like / dislike / clear). Phase 2 will extend -this module with evaluation queries. +Phase 1 surface: message feedback (like / dislike / clear). +Phase 3 surface: evaluation runs — bulk insert, per-session list, completion check. Schema is applied lazily on the first pool acquire — same pattern as navi/core/pg_session_store.py. The full DDL lives in schema.sql alongside. @@ -10,10 +10,13 @@ from __future__ import annotations import asyncio +import json from pathlib import Path import asyncpg +from .schema import EvalRunMetadata, ExpertResult, StoredEvaluation + _SCHEMA_PATH = Path(__file__).parent / "schema.sql" @@ -81,3 +84,97 @@ } for r in rows ] + + async def feedback_by_index(self, session_id: str) -> dict[int, int]: + """Convenience: {message_index: rating} for the judge renderer.""" + rows = await self.list_feedback(session_id) + return {r["message_index"]: r["rating"] for r in rows} + + # ── Evaluations ─────────────────────────────────────────────────────── + + async def insert_evaluation_run( + self, + metadata: EvalRunMetadata, + session_id: str, + results: list[ExpertResult], + ) -> None: + """Persist all expert results from one run under a single eval_run_id.""" + if not results: + return + pool = await self._get_pool() + async with pool.acquire() as conn: + async with conn.transaction(): + for r in results: + await conn.execute( + """ + INSERT INTO evaluations + (id, session_id, eval_run_id, eval_date, judge_model, + judge_version, rubric_version, expert_id, scores, comment) + VALUES (gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9) + """, + session_id, + metadata.eval_run_id, + metadata.eval_date, + metadata.judge_model, + metadata.judge_version, + metadata.rubric_version, + r.expert_id, + r.scores.model_dump_json(), + r.comment, + ) + + async def list_evaluations(self, session_id: str) -> list[StoredEvaluation]: + """All evaluation rows for a session, newest run first.""" + pool = await self._get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT id, session_id, eval_run_id, eval_date, judge_model, + judge_version, rubric_version, expert_id, scores, comment + FROM evaluations + WHERE session_id = $1 + ORDER BY eval_date DESC, expert_id ASC + """, + session_id, + ) + return [ + StoredEvaluation.model_validate( + { + "id": r["id"], + "session_id": r["session_id"], + "eval_run_id": r["eval_run_id"], + "eval_date": r["eval_date"], + "judge_model": r["judge_model"], + "judge_version": r["judge_version"], + "rubric_version": r["rubric_version"], + "expert_id": r["expert_id"], + "scores": json.loads(r["scores"]) if isinstance(r["scores"], str) else r["scores"], + "comment": r["comment"], + } + ) + for r in rows + ] + + async def evaluated_session_ids( + self, judge_version: str, rubric_version: str, expected_experts: int = 3 + ) -> set[str]: + """Sessions with a complete run at the current rubric/judge versions. + + A session counts as evaluated only when at least `expected_experts` + distinct expert rows exist for that (judge_version, rubric_version). + """ + pool = await self._get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT session_id + FROM evaluations + WHERE judge_version = $1 AND rubric_version = $2 + GROUP BY session_id + HAVING COUNT(DISTINCT expert_id) >= $3 + """, + judge_version, + rubric_version, + expected_experts, + ) + return {r["session_id"] for r in rows} diff --git a/debug/eval/judge.py b/debug/eval/judge.py index 779a694..27bdd5c 100644 --- a/debug/eval/judge.py +++ b/debug/eval/judge.py @@ -1,22 +1,20 @@ -"""Judge orchestration: render a session, fan out across 3 experts, average. - -Phase 2 ships the skeleton. Real LLM calls and transcript rendering land in -Phase 3. The shape here is intentionally final so cli.py and api.py can wire -against it without churn. -""" +"""Judge orchestration: render a session, fan out across 3 experts, average.""" from __future__ import annotations +import asyncio import json from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Iterable -from uuid import UUID, uuid4 +from uuid import uuid4 import yaml from navi.core.session import Session +from navi.llm.base import LLMBackend, Message +from navi.profiles.base import AgentProfile from .schema import AXIS_NAMES, EvalRunMetadata, EvalScores, ExpertResult @@ -75,25 +73,237 @@ return "\n".join(lines) -# ── Session rendering (Phase 3) ────────────────────────────────────────── +# ── Session rendering ──────────────────────────────────────────────────── + + +_REACTION = {1: "👍", -1: "👎"} + + +def _format_duration(start: datetime, end: datetime) -> str: + delta = end - start + total = int(delta.total_seconds()) + hours, rem = divmod(total, 3600) + minutes, seconds = divmod(rem, 60) + if hours: + return f"{hours}h{minutes:02d}m" + if minutes: + return f"{minutes}m{seconds:02d}s" + return f"{seconds}s" + + +def _render_header( + session: Session, + profile: AgentProfile | None, + feedback_by_index: dict[int, int], +) -> str: + likes = sum(1 for v in feedback_by_index.values() if v == 1) + dislikes = sum(1 for v in feedback_by_index.values() if v == -1) + + counts = {"user": 0, "assistant": 0, "tool": 0, "system": 0} + for m in session.messages: + counts[m.role] = counts.get(m.role, 0) + 1 + + lines = ["=== Session metadata ==="] + lines.append(f"Session id: {session.id}") + if session.name: + lines.append(f"Name: {session.name}") + lines.append(f"Profile: {session.profile_id}") + if profile is not None: + lines.append(f"Profile description: {profile.description}") + if profile.model: + lines.append(f"Model priority list: {', '.join(profile.model)}") + flags = [] + if profile.planning_enabled: + flags.append("planning") + if profile.planning_phase2_enabled: + flags.append("phase2-review") + if profile.planning_phase3_enabled: + flags.append("phase3-plan") + if profile.think_enabled: + flags.append("think") + if profile.adaptive_replan_enabled: + flags.append("adaptive-replan") + if profile.step_validation_enabled: + flags.append("step-validation") + lines.append(f"Active mechanics: {', '.join(flags) if flags else '(none)'}") + lines.append(f"Max iterations: {profile.max_iterations}") + lines.append(f"Temperature: {profile.temperature} | top_k: {profile.top_k} | top_p: {profile.top_p}") + lines.append(f"Started: {session.created_at.isoformat()}") + lines.append(f"Last active: {session.last_active.isoformat()}") + lines.append(f"Duration: {_format_duration(session.created_at, session.last_active)}") + lines.append( + f"Messages: total={len(session.messages)} " + f"user={counts['user']} assistant={counts['assistant']} tool={counts['tool']} system={counts['system']}" + ) + lines.append(f"User feedback: 👍 {likes} | 👎 {dislikes}") + return "\n".join(lines) + + +def _truncate(text: str, limit: int = 4000) -> str: + if len(text) <= limit: + return text + return text[:limit] + f"\n[...truncated, {len(text) - limit} more chars]" + + +def _render_transcript( + session: Session, + feedback_by_index: dict[int, int], +) -> str: + """Render messages in order with reactions inlined. + + Index is the position in session.messages — same key used for feedback. + Each assistant block (assistant tool_calls + tool results + final text) + starts at its first message; the reaction (if any) is keyed on that index. + """ + out: list[str] = [] + i = 0 + n = len(session.messages) + while i < n: + m = session.messages[i] + idx_marker = f"[#{i}]" + reaction = _REACTION.get(feedback_by_index.get(i, 0)) + + if m.is_compression: + out.append(f"{idx_marker} [Context compression event] {m.content or ''}") + i += 1 + continue + + if m.is_summary: + # Summaries are inserted into context, not the displayed history. + # If one ended up in messages, surface it but mark it explicitly so + # the judge knows it isn't original work. + out.append(f"{idx_marker} [Compressed history block — not original]") + out.append(_truncate(m.content or "")) + i += 1 + continue + + if m.role == "system": + # Bare system messages aren't usually saved to .messages, but if + # they are, keep them visible. + out.append(f"{idx_marker} SYSTEM: {_truncate(m.content or '')}") + i += 1 + continue + + if m.role == "user": + block = [f"{idx_marker} USER:"] + if m.content: + block.append(_truncate(m.content)) + if m.images: + block.append(f"[user attached {len(m.images)} image(s)]") + out.append("\n".join(block)) + i += 1 + continue + + if m.role == "assistant": + block = [f"{idx_marker} ASSISTANT"] + if reaction: + block[0] += f" {reaction}" + block[0] += ":" + + if m.is_plan: + block.append("[plan]") + block.append(_truncate(m.content or "")) + out.append("\n".join(block)) + i += 1 + continue + + if m.thinking: + block.append("[thinking]") + block.append(_truncate(m.thinking)) + + if m.tool_calls: + for tc in m.tool_calls: + args_preview = json.dumps(tc.arguments, ensure_ascii=False)[:300] + block.append(f"[tool_call: {tc.name}] {args_preview}") + # Collect matching tool results that follow + i += 1 + while i < n and session.messages[i].role == "tool": + tr = session.messages[i] + block.append( + f"[tool_result: {tr.name or '?'} (id={tr.tool_call_id or '?'})]\n" + + _truncate(tr.content or "") + ) + i += 1 + if m.elapsed_seconds is not None or m.tool_call_count is not None: + meta = [] + if m.elapsed_seconds is not None: + meta.append(f"elapsed={m.elapsed_seconds}s") + if m.tool_call_count is not None: + meta.append(f"tool_calls={m.tool_call_count}") + if m.token_count is not None: + meta.append(f"tokens={m.token_count}") + block.append(f"[meta] {' '.join(meta)}") + out.append("\n".join(block)) + continue + + # Plain assistant text + if m.content: + block.append(_truncate(m.content)) + if m.elapsed_seconds is not None or m.token_count is not None: + meta = [] + if m.elapsed_seconds is not None: + meta.append(f"elapsed={m.elapsed_seconds}s") + if m.token_count is not None: + meta.append(f"tokens={m.token_count}") + block.append(f"[meta] {' '.join(meta)}") + out.append("\n".join(block)) + i += 1 + continue + + if m.role == "tool": + # Stray tool message without preceding tool_calls — render bare. + out.append( + f"{idx_marker} [tool_result orphan: {m.name or '?'}]\n" + + _truncate(m.content or "") + ) + i += 1 + continue + + # Unknown role — keep moving. + i += 1 + + return "\n\n".join(out) + + +def _render_planning_appendix(session: Session) -> str: + if not session.planning_logs: + return "" + lines = ["=== Planning logs ==="] + for n, log in enumerate(session.planning_logs, 1): + ts = log.get("timestamp", "?") + result = log.get("result", "?") + lines.append(f"\n--- Turn {n} (timestamp={ts}, classification={result}) ---") + phases = log.get("phases", {}) + for phase_id, phase in phases.items(): + output = phase.get("output", "") + lines.append(f"\n[phase {phase_id}]") + lines.append(_truncate(output, limit=2000)) + return "\n".join(lines) def render_session( session: Session, feedback_by_index: dict[int, int] | None = None, + profile: AgentProfile | None = None, ) -> RenderedSession: """Render a session into the text the judge will see. - Phase 2 stub. Phase 3 will produce: - - Header: profile, model list, planning flags, timing, like/dislike counts. - - Transcript: every message in order, with user reactions inlined next - to each assistant block, sub-agent traces indented, planning phases - included as-is. No compression-summary substitution. + Header → metadata + counts. + Transcript → every message in order with reactions inlined and tool + results folded under the calling assistant block. No compression-summary + substitution; the judge sees the original messages. + Appendix → planning_logs if any. """ - raise NotImplementedError("render_session lands in Phase 3") + feedback = feedback_by_index or {} + header = _render_header(session, profile, feedback) + transcript = _render_transcript(session, feedback) + appendix = _render_planning_appendix(session) + if appendix: + transcript = f"{transcript}\n\n{appendix}" + return RenderedSession(header=header, transcript=transcript) -# ── Expert call (Phase 3) ──────────────────────────────────────────────── +# ── Expert call ────────────────────────────────────────────────────────── async def run_expert( @@ -101,36 +311,92 @@ expert_id: str, rendered: RenderedSession, rubric_text: str, - llm, # navi.llm.base.LLMBackend — kept untyped to avoid circular import + llm: LLMBackend, model: str | list[str], ) -> ExpertResult: """Run one expert against the rendered session, parse JSON, validate. - Phase 2 stub. Phase 3 will: - 1. Build [system=expert_prompt, user=rubric_text + rendered.as_user_message()] - 2. Call llm.complete(..., temperature=0.2, think=False) without tools. - 3. Strip code fences if any leaked, json.loads, ExpertResult.model_validate. - 4. On parse error: one retry with a "your previous output was invalid JSON" - nudge appended; then raise. + On invalid JSON or schema mismatch: one retry with a corrective nudge. + Subsequent failure raises so the caller can record / skip this session. """ - raise NotImplementedError("run_expert lands in Phase 3") + system_prompt = load_expert_prompt(expert_id) + user_message = f"{rubric_text}\n\n{rendered.as_user_message()}" + + base_messages = [ + Message(role="system", content=system_prompt), + Message(role="user", content=user_message), + ] + + response = await llm.complete( + messages=base_messages, + tools=None, + temperature=0.2, + model=model, + think=False, + ) + raw = response.content or "" + try: + return parse_expert_json(raw, expert_id) + except (json.JSONDecodeError, ValueError) as first_err: + # Single retry with explicit correction nudge. + retry_messages = base_messages + [ + Message(role="assistant", content=raw), + Message( + role="user", + content=( + f"Your previous output was invalid: {first_err}. " + "Return ONLY the JSON object matching the required schema, " + "no prose, no code fences, no extra fields." + ), + ), + ] + response = await llm.complete( + messages=retry_messages, + tools=None, + temperature=0.1, + model=model, + think=False, + ) + return parse_expert_json(response.content or "", expert_id) -# ── Run orchestration (Phase 3) ────────────────────────────────────────── +# ── Run orchestration ──────────────────────────────────────────────────── async def evaluate_session( *, session: Session, feedback_by_index: dict[int, int] | None, - llm, + profile: AgentProfile | None, + llm: LLMBackend, model: str | list[str], ) -> tuple[EvalRunMetadata, list[ExpertResult]]: """Run all three experts on one session. Returns (metadata, results). - Phase 2 stub. + Experts run in parallel via asyncio.gather. If any expert fails after the + retry, the whole gather fails and the caller decides whether to skip. """ - raise NotImplementedError("evaluate_session lands in Phase 3") + rubric = load_rubric() + rubric_text = render_rubric_for_prompt(rubric) + rendered = render_session(session, feedback_by_index, profile) + + judge_model = model[0] if isinstance(model, list) and model else ( + model if isinstance(model, str) else "unknown" + ) + metadata = new_run_metadata(judge_model=judge_model) + + coros = [ + run_expert( + expert_id=eid, + rendered=rendered, + rubric_text=rubric_text, + llm=llm, + model=model, + ) + for eid in EXPERT_IDS + ] + results = await asyncio.gather(*coros) + return metadata, results def average_scores(results: Iterable[ExpertResult]) -> EvalScores: