"""Planning pipeline — extracted from agent.py.
Async generator that runs 3-phase planning (analysis → review → execution plan).
"""
import asyncio
import re
from datetime import datetime, timezone
from typing import AsyncGenerator
import structlog
from navi.config import settings
from navi.llm.base import Message
from navi.tools._internal.base import current_stop_event
from .events import (
AIHelperTokensUsed,
PlanningDebugData,
PlanningStatus,
PlanReady,
)
log = structlog.get_logger()
def _parse_plan_steps(plan_text: str) -> list[str]:
"""Extract numbered step lines from the **Steps:** section of a plan."""
m = re.search(r'\*\*Steps:\*\*\s*\n(.*?)(?=\n\s*\*\*[^*\n]+:\*\*|\Z)', plan_text, re.DOTALL)
if not m:
return []
steps_block = m.group(1)
steps: list[str] = []
for raw in re.findall(r'^\s*\d+[\.\)]\s*(.+)', steps_block, re.MULTILINE):
step = raw.strip()
if not step or step.startswith("["):
continue
steps.append(step)
return steps
class PlanningEngine:
"""Runs the 3-phase planning pipeline."""
def __init__(self, ctx_builder) -> None:
self._ctx_builder = ctx_builder
async def run(
self,
context: list[Message],
profile,
llm,
mem: Message | None,
tool_schemas: list | None = None,
messages: list[Message] | None = None,
system_prompt_override: str | None = None,
is_subagent: bool = False,
force_plan: bool = False,
) -> AsyncGenerator:
"""Planning pipeline (async generator):
Phase 1 — Analysis (think=profile.think_enabled): reformulate the task,
identify subtasks and unknowns. Outputs DIRECT for simple requests.
Phase 2 — Structured review (conditional, think=False): one critique pass.
Phase 3 — Execution plan (think=False): assigns each subtask to TOOL / AGENT / SELF.
"""
# ── Build compact tool list for Phase 2 / Phase 3 ─────────────────────
if tool_schemas:
tool_lines = []
for schema in tool_schemas:
fn = schema.function if hasattr(schema, "function") else schema.get("function", {})
name = fn.get("name", "")
desc = (fn.get("description") or "").split("\n")[0][:80]
tool_lines.append(f" - {name}: {desc}")
available_tools_block = (
"Available tools (use these exact names for TOOL: assignments):\n"
+ "\n".join(tool_lines)
+ "\n\n"
)
else:
available_tools_block = ""
_stop = current_stop_event.get()
_dbg: dict = {"timestamp": datetime.now(timezone.utc).isoformat(), "result": "plan", "phases": {}}
_base_sys = system_prompt_override if system_prompt_override is not None else self._ctx_builder.build_system_prompt(profile)
_mcp_msg = self._ctx_builder._mcp_context_msg(profile)
if _mcp_msg:
_base_sys = _base_sys + "\n\n---\n\n" + (_mcp_msg.content or "")
# ── Phase 1: Task analysis ────────────────────────────────────────────
analysis: str = ""
if profile.planning_phase1_enabled:
yield PlanningStatus(phase=1, label="Working on it...", is_subagent=is_subagent)
phase1_system = Message(
role="system",
content=(
_base_sys
+ "\n\n---\n\n"
"[PLANNING — PHASE 1: ANALYSIS]\n\n"
"Read the user's latest request.\n\n"
+ (
""
if force_plan else
"If it is a simple question, casual conversation, or answerable in one step "
"without tools — respond with exactly: DIRECT\n\n"
)
+ available_tools_block
+ "Knowledge store rules (critical):\n"
"- `memory` is only for personal user facts and preferences.\n"
"- Connected MCP knowledge servers are authoritative only when the active profile exposes their tools.\n"
"- If the domain is infrastructure and gnexus-book tools are available, use gnexus-book as the primary source and persistence target.\n"
"- If no relevant MCP tools are available to this profile, do not plan to call unavailable MCP tools; use docs, files, command output, web, or ask the user after checking available sources.\n"
"- Never use memory for infrastructure inventory, service topology, network routes, proxy mappings, server roles, or service relationships.\n\n"
"Analyse the request and output:\n\n"
"TASK: [one clear sentence — what actually needs to be done]\n"
"GOAL: [how you will know the task is complete]\n"
"UNKNOWNS: [genuine uncertainties that could block execution, or NONE]\n"
"RESOURCES:\n"
"- [tool_name]: [what it does] — [limitation if any] — [alternative if limitation blocks the goal]\n"
"- context sources: [which of connected MCP knowledge servers / memory / docs / web you will check and why]\n"
"KNOWLEDGE SOURCE ASSESSMENT:\n"
"- Domain: [user personal facts / infrastructure / project documentation / own capabilities / external web]\n"
"- Primary source: [connected knowledge servers / memory / docs / web / source files / command output]\n"
"- Fallback: [alternative source if primary is unavailable]\n"
"KNOWLEDGE CAPTURE:\n"
"- New information to save: [specific facts, conventions, or discoveries that should persist beyond this session]\n"
"- Target: [memory / connected knowledge server / docs / none — choose the best persistent store available to this profile]\n"
"- Duplication check: [which target-specific search/read/list step prevents duplicates]\n"
"- Rationale: [why this knowledge is stable and reusable]\n"
"COMPLEXITY: simple | medium | complex — choose based on ambiguity, number of files/systems, risk, and autonomy needed.\n"
"SUBTASKS:\n"
"1. [discrete unit of work]\n"
"2. [discrete unit of work]\n"
"ATOMICITY: For each subtask that requires multiple actions — if it fails halfway, "
"is any partial result still useful? If not, split it into smaller steps where "
"each one delivers an independent, usable result on its own.\n"
"REFLECT: yes — if the task is complex (multiple unknowns, external APIs, "
"research required, or high-stakes/irreversible actions); "
"no — if it is straightforward and the path is clear.\n"
"COMMITMENTS: [follow the plan step by step using the todo tool; gather missing context independently before asking the user; before the final answer, run a knowledge persistence checkpoint]\n\n"
"Rules: list enough subtasks to make execution unambiguous. "
"Simple tasks usually need 1-3 subtasks; medium tasks 5-9; complex or autonomous tasks 8-15. "
"Hard maximum: 15 subtasks. Each must be concrete and actionable. "
"No execution yet — analysis only."
),
)
phase1_ctx: list[Message] = [phase1_system]
if mem:
phase1_ctx.append(mem)
phase1_ctx.extend(m for m in context if m.role != "system")
try:
r1 = await asyncio.wait_for(
llm.complete(phase1_ctx, tools=None, temperature=0.3, model=profile.model, think=profile.think_enabled),
timeout=settings.llm_complete_timeout,
)
analysis = (r1.content or "").strip()
except asyncio.TimeoutError:
log.warning("agent.planning_phase1_timeout", timeout=settings.llm_complete_timeout)
_dbg["result"] = "phase1_timeout"
if not is_subagent:
yield PlanningDebugData(log=_dbg)
return
except Exception:
log.warning("agent.planning_phase1_failed", exc_info=True)
_dbg["result"] = "phase1_error"
if not is_subagent:
yield PlanningDebugData(log=_dbg)
return
if r1.prompt_tokens or r1.completion_tokens:
yield AIHelperTokensUsed(
prompt_tokens=r1.prompt_tokens or 0,
completion_tokens=r1.completion_tokens or 0,
)
_dbg["phases"]["1"] = {
"output": analysis,
"prompt_tokens": r1.prompt_tokens or 0,
"completion_tokens": r1.completion_tokens or 0,
}
if not analysis or analysis.upper().startswith("DIRECT"):
log.debug("agent.planning_skipped", reason="direct")
_dbg["result"] = "direct"
if not is_subagent:
yield PlanningDebugData(log=_dbg)
return
if _stop and _stop.is_set():
log.debug("agent.planning_stopped", phase=1)
return
else:
log.debug("agent.planning_phase1_skipped")
# ── Phase 2: Structured review (conditional) ───────────────────────────
advisor_feedback: str = ""
needs_reflect = bool(re.search(r"REFLECT\s*:\s*yes", analysis, re.IGNORECASE))
if profile.planning_phase2_enabled and needs_reflect and not is_subagent:
yield PlanningStatus(phase=2, label="Reviewing plan...", is_subagent=is_subagent)
review_system = Message(
role="system",
content=(
_base_sys
+ "\n\n---\n\n"
"[PLANNING - PHASE 2: STRUCTURED REVIEW]\n\n"
"Review the phase 1 task analysis before execution. "
"Do not change the user's goal. Do not invent facts. "
"Prefer resolving missing information through connected knowledge servers, docs, manuals, memory, files, "
"tool schemas, command output, or web research before asking the user. "
"Check that the proposed knowledge source and persistence target match the fact scope: "
"memory only for personal user facts; connected MCP knowledge servers for their own canonical domains; "
"docs/manuals for project-wide documentation. Flag any plan that stores infrastructure facts in memory.\n\n"
"Return exactly these sections:\n\n"
"## Critic\n"
"- 3-5 bullets: wrong or unverified assumptions, ignored risks, contradictions, "
"and facts that must be verified before acting.\n\n"
"## Pragmatist\n"
"- 3-5 bullets: simpler path, unnecessary steps, mergeable steps, better executor choices, "
"and cheaper ways to reach the user's actual goal.\n\n"
"## Detailer\n"
"- 3-5 bullets: missing requirements, missing docs/files/tools to inspect, edge cases, "
"and validation steps.\n\n"
"## Plan Adjustments\n"
"- Concrete changes Phase 3 must apply: add/remove/split/merge/reorder steps, "
"change TOOL/AGENT/SELF executor, verify specific facts, correct the persistence target, add a knowledge persistence checkpoint, or defer user questions "
"until available sources are checked.\n\n"
"Keep output concise. No prose outside these sections.\n\n"
f"PHASE 1 ANALYSIS:\n{analysis}"
),
)
review_ctx: list[Message] = [review_system]
if mem:
review_ctx.append(mem)
review_ctx.extend(m for m in context if m.role != "system")
try:
r_review = await asyncio.wait_for(
llm.complete(review_ctx, tools=None, temperature=0.35, model=profile.model, think=False),
timeout=settings.llm_complete_timeout,
)
advisor_feedback = (r_review.content or "").strip()
if r_review.prompt_tokens or r_review.completion_tokens:
yield AIHelperTokensUsed(
prompt_tokens=r_review.prompt_tokens or 0,
completion_tokens=r_review.completion_tokens or 0,
)
_dbg["phases"]["2"] = {
"output": advisor_feedback,
"prompt_tokens": r_review.prompt_tokens or 0,
"completion_tokens": r_review.completion_tokens or 0,
}
log.debug("agent.planning_review_done", has_output=bool(advisor_feedback))
except Exception:
log.warning("agent.planning_review_failed", exc_info=True)
_dbg["phases"]["2"] = {"output": "", "prompt_tokens": 0, "completion_tokens": 0}
if _stop and _stop.is_set():
log.debug("agent.planning_stopped", phase=2)
return
# ── Phase 3: Execution plan ────────────────────────────────────────────
if not profile.planning_phase3_enabled:
log.debug("agent.planning_phase3_skipped")
_dbg["result"] = "phase1_only"
if not is_subagent:
yield PlanningDebugData(log=_dbg)
return
yield PlanningStatus(phase=3, label="Building execution plan...", is_subagent=is_subagent)
advisor_block = (
"Structured review feedback — apply the Plan Adjustments in your plan:\n\n"
+ advisor_feedback
+ "\n\n---\n\n"
) if advisor_feedback else ""
phase3_system = Message(
role="system",
content=(
_base_sys
+ "\n\n---\n\n"
"[PLANNING — PHASE 3: EXECUTION PLAN]\n\n"
"Task analysis:\n\n"
f"{analysis}\n\n"
"---\n\n"
+ advisor_block
+ available_tools_block
+ "Now write the execution plan. For each subtask assign a specific executor:\n"
"- TOOL: <tool_name> — a single tool call is enough; use exact tool names from the list above\n"
"- AGENT: <profile_id> — a bounded subtask needing 3+ tool calls; one subagent handles this ONE step\n"
"- SELF — final user-facing synthesis or an internal judgment that needs no tool call\n\n"
"Executor classification rules (critical):\n"
"- If a step names or implies a tool action, mark it TOOL with that exact tool name, never SELF.\n"
"- Use TOOL for searching, reading, writing files, editing files, scratchpad notes, todo updates, "
"image inspection, rendering, compiling, publishing, sharing, terminal commands, API calls, and verification through tool output.\n"
"- Use SELF only for synthesis, choosing between already-known options, or explaining completed results.\n"
"- If a planned step cannot be completed without later calling a tool, it is not SELF.\n\n"
"Planning boundary (critical):\n"
"The plan is an execution contract, not an implementation. It may describe intent, order, executor, "
"inputs, expected outputs, and verification. It must NOT contain implementation code, source snippets, "
"function bodies, CSS/HTML/SQL/Python/JS, patches, exact file contents, or detailed command scripts. "
"Implementation belongs later in tool calls, file edits, terminal/code execution, or final artifacts. "
"A valid plan says what to change and how to verify it, not the code that performs the change.\n\n"
"Plan depth:\n"
"- simple: 1-3 steps\n"
"- medium: 5-9 steps\n"
"- complex or autonomous: 8-15 steps\n"
"- hard maximum: 15 steps\n"
"Use enough steps to make execution unambiguous. Do not compress unrelated actions into one step.\n\n"
"Knowledge source and persistence rules (critical):\n"
"- `memory` is only for personal user facts and preferences.\n"
"- Never store infrastructure inventory, service topology, network routes, proxy mappings, server roles, or service relationships in memory.\n"
"- Connected MCP knowledge servers are authoritative only when the active profile exposes their tools. Do not plan unavailable MCP tool calls.\n"
"- If the task domain is infrastructure and gnexus-book tools are available, include a gnexus-book read/search step before answering or changing anything.\n"
"- If execution may discover durable facts, include a dedicated knowledge persistence checkpoint before final synthesis.\n\n"
"For every non-trivial task, include steps for information gathering from connected knowledge servers/docs/files/tool schemas, "
"implementation or analysis, verification, knowledge persistence checkpoint, and final synthesis. "
"Choose the persistence target based on the fact's scope and the active profile's available tools: "
"memory tool for personal user facts and preferences only; connected knowledge servers for their own canonical domains "
"(for example gnexus-book for infrastructure inventory when its MCP tools are available); docs/ or manuals/ for project-wide documentation; "
"filesystem for standalone files. "
"Always search/read/list the selected target first to avoid duplicates. "
"The checkpoint can be SELF only when the task could not have discovered durable reusable facts; otherwise assign it to the exact persistence/search tool that will be needed later.\n\n"
"AGENT scoping rules (critical):\n"
"- Each AGENT step is one focused, independently verifiable unit of work.\n"
"- One AGENT step = one spawn_agent call later. Do NOT bundle multiple concerns.\n"
"- Comma test: if your step description lists things with 'and' or commas, "
"each item is a separate step.\n"
"- Good: 'Research X pricing from 3 sources' | 'Audit SSH config on host Y'\n"
"- Bad: 'Research everything and write the full report' (too broad — split it)\n\n"
"Required output format (use exactly this structure):\n\n"
"## Plan\n\n"
"**Task:** [reformulated task]\n"
"**Goal:** [success criterion]\n\n"
"**Milestones:**\n"
"A. [strategic phase]\n"
"B. [strategic phase]\n"
"C. [strategic phase]\n\n"
"**Steps:**\n"
"1. [description] → TOOL: tool_name\n"
"2. [description] → AGENT: profile_id\n"
"3. [description] → AGENT: profile_id\n"
"4. Knowledge persistence checkpoint: [search/read selected target; persist stable facts if discovered, or confirm none] → TOOL: tool_name OR SELF\n"
"5. [final synthesis] → SELF\n"
"... continue to the needed depth, up to 15 steps\n\n"
"**Parallel:** [step numbers that can run simultaneously, or NONE]\n"
"**Risks:** [unknowns to watch for, or NONE]\n\n"
"Reject vague steps such as 'research and implement everything', 'fix all issues', "
"or 'analyze project and make changes'. Split them into concrete, verifiable units. "
"Do not write prose. Do not start executing. Plan only."
),
)
phase3_ctx: list[Message] = [phase3_system]
if mem:
phase3_ctx.append(mem)
user_msgs = [m for m in context if m.role == "user"]
if user_msgs:
phase3_ctx.append(user_msgs[-1])
try:
r2 = await asyncio.wait_for(
llm.complete(phase3_ctx, tools=None, temperature=0.3, model=profile.model, think=False),
timeout=settings.llm_complete_timeout,
)
plan_text = (r2.content or "").strip()
except asyncio.TimeoutError:
log.warning("agent.planning_phase3_timeout", timeout=settings.llm_complete_timeout)
_dbg["result"] = "phase3_timeout"
if not is_subagent:
yield PlanningDebugData(log=_dbg)
return
except Exception:
log.warning("agent.planning_phase3_failed", exc_info=True)
_dbg["result"] = "phase3_error"
if not is_subagent:
yield PlanningDebugData(log=_dbg)
return
if r2.prompt_tokens or r2.completion_tokens:
yield AIHelperTokensUsed(
prompt_tokens=r2.prompt_tokens or 0,
completion_tokens=r2.completion_tokens or 0,
)
_dbg["phases"]["3"] = {
"output": plan_text,
"prompt_tokens": r2.prompt_tokens or 0,
"completion_tokens": r2.completion_tokens or 0,
}
if not plan_text:
_dbg["result"] = "empty_plan"
if not is_subagent:
yield PlanningDebugData(log=_dbg)
return
if not re.search(r"^\s*\d+[\.\)]", plan_text, re.MULTILINE):
log.warning("agent.planning_no_numbered_steps", plan_preview=plan_text[:200])
if _stop and _stop.is_set():
log.debug("agent.planning_stopped", phase=3)
return
if not re.search(r"(TOOL:|AGENT:|→\s*SELF)", plan_text):
log.warning("agent.planning_no_executors", hint="plan lacks TOOL/AGENT/SELF assignments")
plan_ctx_msg = Message(role="assistant", content=plan_text, is_display=False)
context.append(plan_ctx_msg)
if messages is not None:
messages.append(plan_ctx_msg)
messages.append(Message(role="assistant", content=plan_text, is_plan=True, is_context=False))
prompt_msg = Message(
role="user",
content="Plan is ready. Execute it now step by step, starting with step 1. Use the todo tool to track progress.",
is_display=False,
)
context.append(prompt_msg)
if messages is not None:
messages.append(prompt_msg)
_todo_steps = _parse_plan_steps(plan_text)
if _todo_steps:
try:
from navi.tools.todo import set_tasks
from navi.tools._internal.base import current_session_id as _sid_var
_sid = _sid_var.get() or "__default__"
await set_tasks(_sid, _todo_steps)
log.debug("agent.todo_auto_populated", steps=len(_todo_steps), session=_sid)
except Exception:
log.warning("agent.todo_auto_populate_failed", exc_info=True)
log.debug("agent.plan_ready", phases=3 if advisor_feedback else 2, length=len(plan_text))
if not is_subagent:
yield PlanningDebugData(log=_dbg)
yield PlanReady(plan=plan_text, is_subagent=is_subagent)