"""Session-scoped task plan manager — backed by PostgreSQL KV store."""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from navi.tools._internal.base import Tool, ToolContext, ToolResult, current_session_id, current_user_id
_STATUS_ICON: dict[str, str] = {
"pending": "○",
"in_progress": "◎",
"done": "✓",
"failed": "✗",
"skipped": "—",
}
@dataclass
class _Task:
text: str
status: str = "pending"
validation: str = "" # how the result was verified (required for done, encouraged for failed)
# Global KV store reference — injected at startup by registry.py
_kv_store = None
def set_kv_store(kv) -> None:
"""Inject the shared KvStore instance (called once at startup)."""
global _kv_store
_kv_store = kv
def _sid(explicit: str | None = None) -> str:
return explicit or current_session_id.get() or "__default__"
def _uid(explicit: str | None = None) -> str | None:
return explicit if explicit is not None else current_user_id.get(None)
async def _load_tasks(sid: str) -> list[_Task]:
if _kv_store is None:
return []
raw = await _kv_store.get(_uid(), sid, "todo", "tasks")
if not raw:
return []
try:
data = json.loads(raw)
return [_Task(**item) for item in data]
except Exception:
return []
async def _save_tasks(sid: str, tasks: list[_Task]) -> None:
if _kv_store is None:
return
data = [{"text": t.text, "status": t.status, "validation": t.validation} for t in tasks]
await _kv_store.set(_uid(), sid, "todo", "tasks", json.dumps(data))
class TodoTool(Tool):
name = "todo"
description = (
"Task plan tracker. Your todo list is automatically populated from the plan at the start of each task — "
"you do NOT need to call 'set'. "
"Indexes are 1-based. Call 'update' with status='in_progress' when you start a step. "
"Call 'update' immediately after completing or failing each step — before moving to the next. "
"When marking a step 'done', you MUST provide a 'validation' field describing how you verified the result. "
"When marking a step 'failed', provide 'validation' explaining what went wrong and what you tried. "
"Before final response, make sure every completed step, including the final step, is marked done with validation. "
"Call 'view' to re-orient yourself after sub-agent execution or long tool chains. "
"Use 'set' only when you need to replace the plan mid-task (rare). "
"Statuses: pending → in_progress → done / failed / skipped."
)
parameters = {
"type": "object",
"properties": {
"op": {
"type": "string",
"enum": ["set", "view", "update", "clear"],
"description": (
"set — create/replace the Master Plan with a list of task milestones; "
"view — show the current state of the plan; "
"update — change the status of a specific task; "
"clear — reset the plan"
),
},
"tasks": {
"type": "array",
"items": {"type": "string"},
"description": "Ordered list of task descriptions (required for 'set').",
},
"index": {
"type": "integer",
"description": "1-based task index (required for 'update').",
},
"status": {
"type": "string",
"enum": ["pending", "in_progress", "done", "failed", "skipped"],
"description": "New status for the task (required for 'update').",
},
"validation": {
"type": "string",
"description": (
"Required when status='done': briefly describe how you verified the result "
"(e.g. 'ran test_tool — output matched expected', 'read the file and confirmed content'). "
"Encouraged when status='failed': describe what went wrong and what you tried."
),
},
},
"required": ["op"],
}
def __init__(self, kv_store=None) -> None:
if kv_store is not None:
set_kv_store(kv_store)
async def execute(self, params: dict, ctx: ToolContext | None = None) -> ToolResult:
sid = _sid(ctx.session_id if ctx else None)
op = params.get("op")
if op == "set":
raw = params.get("tasks") or []
if not raw:
return ToolResult(success=False, output="", error="'tasks' list is required for 'set'")
tasks = [_Task(text=str(t)) for t in raw]
await _save_tasks(sid, tasks)
return ToolResult(success=True, output=self._render(sid, tasks))
if op == "view":
tasks = await _load_tasks(sid)
if not tasks:
return ToolResult(success=True, output="No plan set for this session.")
return ToolResult(success=True, output=self._render(sid, tasks))
if op == "update":
tasks = await _load_tasks(sid)
if not tasks:
return ToolResult(success=False, output="", error="No plan set. Use 'set' first.")
idx = params.get("index")
status = params.get("status")
validation = (params.get("validation") or "").strip()
if not idx or not status:
return ToolResult(success=False, output="", error="'index' and 'status' are required for 'update'")
if idx < 1 or idx > len(tasks):
return ToolResult(success=False, output="", error=f"index {idx} is out of range (plan has {len(tasks)} tasks)")
if status == "done" and not validation:
return ToolResult(
success=False,
output=(
f"Cannot mark step {idx} as done without validation.\n"
"Provide a 'validation' field describing how you verified the result before marking it done.\n"
"Example: \"ran test_tool — all assertions passed\" or \"read the output file and confirmed expected content\".\n"
"If you haven't verified yet — verify first, then update."
),
error="validation_required",
)
if status == "failed" and not validation:
# Soft prompt — don't block, but encourage explanation
tasks[idx - 1].status = status
await _save_tasks(sid, tasks)
return ToolResult(
success=True,
output=(
self._render(sid, tasks) + "\n\n"
f"[Tip: next time add a 'validation' field when marking a step failed — "
"describe what went wrong and what you tried. "
"This helps with re-planning.]"
),
)
tasks[idx - 1].status = status
tasks[idx - 1].validation = validation
await _save_tasks(sid, tasks)
return ToolResult(success=True, output=self._render(sid, tasks))
if op == "clear":
if _kv_store is not None:
await _kv_store.clear_scope(_uid(ctx.user_id if ctx else None), sid, "todo")
return ToolResult(success=True, output="Plan cleared.")
return ToolResult(success=False, output="", error=f"Unknown op: {op!r}")
def _render(self, sid: str, tasks: list[_Task]) -> str:
if not tasks:
return "Plan is empty."
n = len(tasks)
done = sum(1 for t in tasks if t.status == "done")
lines = [f"Plan — {done}/{n} done:"]
for i, t in enumerate(tasks, 1):
icon = _STATUS_ICON.get(t.status, "?")
suffix = f" ({t.status})" if t.status not in ("pending", "done") else ""
validation_note = f" [verified: {t.validation}]" if t.validation else ""
lines.append(f" {icon} {i}. {t.text}{suffix}{validation_note}")
return "\n".join(lines)
# ── Public API for agent.py (avoids tight coupling to internal storage) ──
async def get_task_snapshot(session_id: str) -> frozenset[tuple[str, str]]:
"""Return a frozenset of (task_text, status) for the session's todo list."""
try:
tasks = await _load_tasks(session_id)
return frozenset((t.text, t.status) for t in tasks)
except Exception:
return frozenset()
async def get_failed_steps(session_id: str) -> frozenset[tuple[int, str]]:
"""Return a frozenset of (1-based index, task_text) for failed steps."""
try:
tasks = await _load_tasks(session_id)
return frozenset(
(i + 1, t.text)
for i, t in enumerate(tasks)
if t.status == "failed"
)
except Exception:
return frozenset()
async def get_progress_message(session_id: str, *, first_iteration: bool = False) -> "Message | None":
"""Build a compact system reminder with current todo state."""
try:
tasks = await _load_tasks(session_id)
if not tasks:
return None
n = len(tasks)
done = sum(1 for t in tasks if t.status == "done")
failed = sum(1 for t in tasks if t.status == "failed")
in_progress = sum(1 for t in tasks if t.status == "in_progress")
pending = n - done - failed - in_progress
# Progress line
icon_done = _STATUS_ICON["done"]
icon_failed = _STATUS_ICON["failed"]
lines = [f"TODO progress: {done}/{n} {icon_done} {failed} {icon_failed} ({in_progress} in progress, {pending} pending)"]
# List remaining tasks
remaining = [t for t in tasks if t.status not in ("done", "skipped")]
if remaining:
lines.append("Remaining tasks:")
for i, t in enumerate(remaining, 1):
lines.append(f" {i}. {t.text}")
else:
lines.append("All tasks completed.")
# Adaptive discipline note
if failed > 0 and failed >= done:
lines.append(
"Discipline: you have more failures than completions. "
"Stop and diagnose the root cause before continuing."
)
elif failed > 0 and failed == 1 and done >= 2:
lines.append(
"Discipline: one failure so far — acceptable if the root cause is understood. "
"Document the fix in your response."
)
elif failed > 1:
lines.append(
"Discipline: multiple failures detected. Consider replanning or delegating."
)
if first_iteration:
lines.append(
"Discipline: this is the FIRST iteration. Prioritise the highest-impact "
"remaining task and complete it fully before moving to the next."
)
elif pending == 0 and failed == 0:
lines.append(
"Discipline: all tasks are done or in progress. Finalise the response now."
)
else:
lines.append(
"Discipline: focus on completing ONE task per iteration. "
"Do not start the next task until the current one is verified and marked done."
)
from navi.llm.base import Message
return Message(role="system", content="\n".join(lines))
except Exception:
return None
async def set_tasks(session_id: str, task_texts: list[str]) -> None:
"""Auto-populate the todo list from plan steps."""
tasks = [_Task(text=s) for s in task_texts]
await _save_tasks(session_id, tasks)
async def render_todo_lines(session_id: str) -> list[str]:
"""Return a list of formatted todo lines for goal anchoring."""
tasks = await _load_tasks(session_id)
if not tasks:
return []
lines = []
for i, t in enumerate(tasks):
icon = _STATUS_ICON.get(t.status, "?")
lines.append(f" {icon} [{i}] {t.text} ({t.status})")
return lines