Newer
Older
navi-1 / debug / eval / cli.py
"""Standalone CLI for the eval system.

Invoke from the project root:

    python -m debug.eval run [--session ID] [--since DATE] [--limit N] [--re-evaluate-all] [--dry-run] [--model MODEL]
    python -m debug.eval show <session_id>
    python -m debug.eval stats [--days 30] [--csv]    # Phase 4

Phase 3 implements `run` and `show`. Stats lands in Phase 4.
"""

from __future__ import annotations

import argparse
import asyncio
import sys
import traceback
from datetime import datetime, timezone
from typing import Sequence

from navi.config import settings

from .db import EvalDB
from .judge import (
    EXPERT_IDS,
    JUDGE_VERSION,
    RUBRIC_VERSION,
    average_scores,
    evaluate_session,
)


# Default judge model. The fallback backend will probe the priority list and
# pick the first one available, so passing a list also works.
DEFAULT_JUDGE_MODEL = "gemma4:31b-cloud"


def _build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        prog="debug.eval",
        description="LLM-as-judge evaluation runner for Navi sessions.",
    )
    sub = p.add_subparsers(dest="cmd", required=True)

    p_run = sub.add_parser("run", help="Evaluate sessions against the pinned rubric.")
    p_run.add_argument("--session", help="Single session id to evaluate.")
    p_run.add_argument("--since", help="ISO date — only sessions started after this.")
    p_run.add_argument("--limit", type=int, default=None, help="Max sessions to process.")
    p_run.add_argument(
        "--re-evaluate-all",
        action="store_true",
        help="Re-evaluate every session, even if a current-version row already exists.",
    )
    p_run.add_argument(
        "--dry-run",
        action="store_true",
        help="List what would be evaluated, do not call the judge.",
    )
    p_run.add_argument(
        "--model",
        default=DEFAULT_JUDGE_MODEL,
        help=f"Judge model (default: {DEFAULT_JUDGE_MODEL}). Repeat the flag to set a fallback list.",
    )
    p_run.add_argument(
        "--backend",
        default="ollama",
        help="Backend key from BackendRegistry (default: ollama).",
    )

    p_show = sub.add_parser("show", help="Print stored evaluations for a session.")
    p_show.add_argument("session_id")

    p_stats = sub.add_parser("stats", help="Aggregate scores across the archive.")
    p_stats.add_argument("--days", type=int, default=30, help="Window in days.")
    p_stats.add_argument("--csv", action="store_true", help="Emit CSV to stdout.")
    p_stats.add_argument(
        "--by-complexity-bucket",
        action="store_true",
        help="Split by 0-25 / 26-50 / 51-75 / 76+ buckets.",
    )

    return p


# ── Helpers ──────────────────────────────────────────────────────────────


def _parse_since(value: str | None) -> datetime | None:
    if not value:
        return None
    dt = datetime.fromisoformat(value)
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    return dt


def _ensure_db_url() -> None:
    if not settings.database_url:
        print("DATABASE_URL not set — eval system requires postgres.", file=sys.stderr)
        sys.exit(1)


# ── Commands ─────────────────────────────────────────────────────────────


async def _cmd_run(args: argparse.Namespace) -> int:
    _ensure_db_url()

    # Lazy imports — these instantiate session_store, registries, pools.
    from navi.api.deps import get_backend_registry, get_profile_registry, get_session_store

    db = EvalDB(settings.database_url)
    session_store = get_session_store()
    backend_registry = get_backend_registry()
    profile_registry = get_profile_registry()

    # Pick sessions
    if args.session:
        s = await session_store.get(args.session)
        if s is None:
            print(f"Session not found: {args.session}", file=sys.stderr)
            return 1
        sessions = [s]
    else:
        sessions = await session_store.list_all()
        if not args.re_evaluate_all:
            already = await db.evaluated_session_ids(JUDGE_VERSION, RUBRIC_VERSION)
            sessions = [s for s in sessions if s.id not in already]
        since_dt = _parse_since(args.since)
        if since_dt is not None:
            sessions = [s for s in sessions if s.created_at >= since_dt]
        if args.limit is not None:
            sessions = sessions[: args.limit]

    if not sessions:
        print("Nothing to evaluate.")
        return 0

    if args.dry_run:
        print(f"Would evaluate {len(sessions)} session(s):")
        for s in sessions:
            print(f"  {s.id}  profile={s.profile_id}  msgs={len(s.messages)}  started={s.created_at.isoformat()}")
        return 0

    # Resolve LLM backend
    try:
        llm = backend_registry.get(args.backend)
    except Exception as e:
        print(f"Failed to resolve backend {args.backend!r}: {e}", file=sys.stderr)
        return 1

    print(
        f"Evaluating {len(sessions)} session(s) "
        f"with judge_model={args.model} judge_version={JUDGE_VERSION} rubric_version={RUBRIC_VERSION}"
    )

    failures = 0
    for i, session in enumerate(sessions, 1):
        prefix = f"[{i}/{len(sessions)}] {session.id}  ({session.profile_id}, {len(session.messages)} msgs)"
        print(prefix)
        try:
            feedback = await db.feedback_by_index(session.id)
            profile = profile_registry.get(session.profile_id)
            metadata, results = await evaluate_session(
                session=session,
                feedback_by_index=feedback,
                profile=profile,
                llm=llm,
                model=args.model,
            )
            await db.insert_evaluation_run(metadata, session.id, results)
            avg = average_scores(results)
            print(f"  -> avg: {avg.model_dump()}")
            for r in results:
                print(f"     {r.expert_id}: {r.scores.model_dump()}")
        except Exception as e:
            failures += 1
            print(f"  ERROR: {e}", file=sys.stderr)
            traceback.print_exc()

    print(f"\nDone. ok={len(sessions) - failures} failed={failures}")
    return 0 if failures == 0 else 1


async def _cmd_show(args: argparse.Namespace) -> int:
    _ensure_db_url()

    db = EvalDB(settings.database_url)
    rows = await db.list_evaluations(args.session_id)
    if not rows:
        print("No evaluations found.")
        return 0

    # Group by run_id, preserving the (already DESC) date order
    runs: dict[str, list] = {}
    for r in rows:
        runs.setdefault(str(r.eval_run_id), []).append(r)

    for run_id, run_rows in runs.items():
        head = run_rows[0]
        print()
        print(f"=== Run {run_id}  ({head.eval_date.isoformat()}) ===")
        print(
            f"  Judge: {head.judge_model} ({head.judge_version})   "
            f"Rubric: {head.rubric_version}"
        )
        for r in run_rows:
            print(f"\n  [{r.expert_id}]")
            for axis, score in r.scores.model_dump().items():
                print(f"    {axis:>26}: {score}")
            print(f"    comment: {r.comment}")
        if len(run_rows) >= len(EXPERT_IDS):
            avg = average_scores(run_rows)
            print("\n  AVERAGE")
            for axis, score in avg.model_dump().items():
                print(f"    {axis:>26}: {score}")

    return 0


async def _cmd_stats(args: argparse.Namespace) -> int:
    print("[eval] stats command — lands in Phase 4.", file=sys.stderr)
    print(
        f"[eval] would aggregate: days={args.days} csv={args.csv} "
        f"by_bucket={args.by_complexity_bucket}",
        file=sys.stderr,
    )
    return 2


# ── Entry point ──────────────────────────────────────────────────────────


def main(argv: Sequence[str] | None = None) -> int:
    args = _build_parser().parse_args(argv)
    coro = {
        "run": _cmd_run,
        "show": _cmd_show,
        "stats": _cmd_stats,
    }[args.cmd](args)
    return asyncio.run(coro)


if __name__ == "__main__":
    raise SystemExit(main())