Newer
Older
navi-1 / navi / core / planning.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy on 1 May 19 KB Improve 3D modeling validation prompts
"""Planning pipeline — extracted from agent.py.

Async generator that runs 3-phase planning (analysis → review → execution plan).
"""

import asyncio
import re
from datetime import datetime, timezone
from typing import AsyncGenerator

import structlog

from navi.config import settings
from navi.llm.base import Message
from navi.tools.base import current_stop_event

from .events import (
    AIHelperTokensUsed,
    PlanningDebugData,
    PlanningStatus,
    PlanReady,
)

log = structlog.get_logger()


def _parse_plan_steps(plan_text: str) -> list[str]:
    """Extract numbered step lines from the **Steps:** section of a plan."""
    m = re.search(r'\*\*Steps:\*\*\s*\n(.*?)(?=\n\s*\*\*[^*\n]+:\*\*|\Z)', plan_text, re.DOTALL)
    if not m:
        return []
    steps_block = m.group(1)
    steps: list[str] = []
    for raw in re.findall(r'^\s*\d+[\.\)]\s*(.+)', steps_block, re.MULTILINE):
        step = raw.strip()
        if not step or step.startswith("["):
            continue
        steps.append(step)
    return steps


class PlanningEngine:
    """Runs the 3-phase planning pipeline."""

    def __init__(self, ctx_builder) -> None:
        self._ctx_builder = ctx_builder

    async def run(
        self,
        context: list[Message],
        profile,
        llm,
        mem: Message | None,
        tool_schemas: list | None = None,
        messages: list[Message] | None = None,
        system_prompt_override: str | None = None,
        is_subagent: bool = False,
        force_plan: bool = False,
    ) -> AsyncGenerator:
        """Planning pipeline (async generator):

        Phase 1 — Analysis (think=profile.think_enabled): reformulate the task,
                   identify subtasks and unknowns. Outputs DIRECT for simple requests.
        Phase 2 — Structured review (conditional, think=False): one critique pass.
        Phase 3 — Execution plan (think=False): assigns each subtask to TOOL / AGENT / SELF.
        """
        # ── Build compact tool list for Phase 2 / Phase 3 ─────────────────────
        if tool_schemas:
            tool_lines = []
            for schema in tool_schemas:
                fn = schema.function if hasattr(schema, "function") else schema.get("function", {})
                name = fn.get("name", "")
                desc = (fn.get("description") or "").split("\n")[0][:80]
                tool_lines.append(f"  - {name}: {desc}")
            available_tools_block = (
                "Available tools (use these exact names for TOOL: assignments):\n"
                + "\n".join(tool_lines)
                + "\n\n"
            )
        else:
            available_tools_block = ""

        _stop = current_stop_event.get()
        _dbg: dict = {"timestamp": datetime.now(timezone.utc).isoformat(), "result": "plan", "phases": {}}

        _base_sys = system_prompt_override if system_prompt_override is not None else self._ctx_builder.build_system_prompt(profile)

        # ── Phase 1: Task analysis ────────────────────────────────────────────
        analysis: str = ""
        if profile.planning_phase1_enabled:
            yield PlanningStatus(phase=1, label="Working on it...", is_subagent=is_subagent)
            phase1_system = Message(
                role="system",
                content=(
                    _base_sys
                    + "\n\n---\n\n"
                    "[PLANNING — PHASE 1: ANALYSIS]\n\n"
                    "Read the user's latest request.\n\n"
                    + (
                        ""
                        if force_plan else
                        "If it is a simple question, casual conversation, or answerable in one step "
                        "without tools — respond with exactly: DIRECT\n\n"
                    )
                    + available_tools_block
                    + "Analyse the request and output:\n\n"
                    "TASK: [one clear sentence — what actually needs to be done]\n"
                    "GOAL: [how you will know the task is complete]\n"
                    "UNKNOWNS: [genuine uncertainties that could block execution, or NONE]\n"
                    "RESOURCES:\n"
                    "- [tool_name]: [what it does] — [limitation if any] — [alternative if limitation blocks the goal]\n"
                    "- context sources: [which of memory / NAVI.md / web you will check and why]\n"
                    "COMPLEXITY: simple | medium | complex — choose based on ambiguity, number of files/systems, risk, and autonomy needed.\n"
                    "SUBTASKS:\n"
                    "1. [discrete unit of work]\n"
                    "2. [discrete unit of work]\n"
                    "ATOMICITY: For each subtask that requires multiple actions — if it fails halfway, "
                    "is any partial result still useful? If not, split it into smaller steps where "
                    "each one delivers an independent, usable result on its own.\n"
                    "REFLECT: yes — if the task is complex (multiple unknowns, external APIs, "
                    "research required, or high-stakes/irreversible actions); "
                    "no — if it is straightforward and the path is clear.\n"
                    "COMMITMENTS: [follow the plan step by step using the todo tool; gather any missing context independently without asking the user]\n\n"
                    "Rules: list enough subtasks to make execution unambiguous. "
                    "Simple tasks usually need 1-3 subtasks; medium tasks 5-9; complex or autonomous tasks 8-15. "
                    "Hard maximum: 15 subtasks. Each must be concrete and actionable. "
                    "No execution yet — analysis only."
                ),
            )
            phase1_ctx: list[Message] = [phase1_system]
            if mem:
                phase1_ctx.append(mem)
            phase1_ctx.extend(m for m in context if m.role != "system")

            try:
                r1 = await asyncio.wait_for(
                    llm.complete(phase1_ctx, tools=None, temperature=0.3, model=profile.model, think=profile.think_enabled),
                    timeout=settings.llm_complete_timeout,
                )
                analysis = (r1.content or "").strip()
            except asyncio.TimeoutError:
                log.warning("agent.planning_phase1_timeout", timeout=settings.llm_complete_timeout)
                _dbg["result"] = "phase1_timeout"
                if not is_subagent:
                    yield PlanningDebugData(log=_dbg)
                return
            except Exception:
                log.warning("agent.planning_phase1_failed", exc_info=True)
                _dbg["result"] = "phase1_error"
                if not is_subagent:
                    yield PlanningDebugData(log=_dbg)
                return

            if r1.prompt_tokens or r1.completion_tokens:
                yield AIHelperTokensUsed(
                    prompt_tokens=r1.prompt_tokens or 0,
                    completion_tokens=r1.completion_tokens or 0,
                )

            _dbg["phases"]["1"] = {
                "output": analysis,
                "prompt_tokens": r1.prompt_tokens or 0,
                "completion_tokens": r1.completion_tokens or 0,
            }

            if not analysis or analysis.upper().startswith("DIRECT"):
                log.debug("agent.planning_skipped", reason="direct")
                _dbg["result"] = "direct"
                if not is_subagent:
                    yield PlanningDebugData(log=_dbg)
                return

            if _stop and _stop.is_set():
                log.debug("agent.planning_stopped", phase=1)
                return
        else:
            log.debug("agent.planning_phase1_skipped")

        # ── Phase 2: Structured review (conditional) ───────────────────────────
        advisor_feedback: str = ""
        needs_reflect = bool(re.search(r"REFLECT\s*:\s*yes", analysis, re.IGNORECASE))

        if profile.planning_phase2_enabled and needs_reflect and not is_subagent:
            yield PlanningStatus(phase=2, label="Reviewing plan...", is_subagent=is_subagent)

            review_system = Message(
                role="system",
                content=(
                    _base_sys
                    + "\n\n---\n\n"
                    "[PLANNING - PHASE 2: STRUCTURED REVIEW]\n\n"
                    "Review the phase 1 task analysis before execution. "
                    "Do not change the user's goal. Do not invent facts. "
                    "Prefer resolving missing information through NAVI.md, docs, manuals, memory, files, "
                    "tool schemas, command output, or web research before asking the user.\n\n"
                    "Return exactly these sections:\n\n"
                    "## Critic\n"
                    "- 3-5 bullets: wrong or unverified assumptions, ignored risks, contradictions, "
                    "and facts that must be verified before acting.\n\n"
                    "## Pragmatist\n"
                    "- 3-5 bullets: simpler path, unnecessary steps, mergeable steps, better executor choices, "
                    "and cheaper ways to reach the user's actual goal.\n\n"
                    "## Detailer\n"
                    "- 3-5 bullets: missing requirements, missing docs/files/tools to inspect, edge cases, "
                    "and validation steps.\n\n"
                    "## Plan Adjustments\n"
                    "- Concrete changes Phase 3 must apply: add/remove/split/merge/reorder steps, "
                    "change TOOL/AGENT/SELF executor, verify specific facts, or defer user questions "
                    "until available sources are checked.\n\n"
                    "Keep output concise. No prose outside these sections.\n\n"
                    f"PHASE 1 ANALYSIS:\n{analysis}"
                ),
            )
            review_ctx: list[Message] = [review_system]
            if mem:
                review_ctx.append(mem)
            review_ctx.extend(m for m in context if m.role != "system")
            try:
                r_review = await asyncio.wait_for(
                    llm.complete(review_ctx, tools=None, temperature=0.35, model=profile.model, think=False),
                    timeout=settings.llm_complete_timeout,
                )
                advisor_feedback = (r_review.content or "").strip()
                if r_review.prompt_tokens or r_review.completion_tokens:
                    yield AIHelperTokensUsed(
                        prompt_tokens=r_review.prompt_tokens or 0,
                        completion_tokens=r_review.completion_tokens or 0,
                    )
                _dbg["phases"]["2"] = {
                    "output": advisor_feedback,
                    "prompt_tokens": r_review.prompt_tokens or 0,
                    "completion_tokens": r_review.completion_tokens or 0,
                }
                log.debug("agent.planning_review_done", has_output=bool(advisor_feedback))
            except Exception:
                log.warning("agent.planning_review_failed", exc_info=True)
                _dbg["phases"]["2"] = {"output": "", "prompt_tokens": 0, "completion_tokens": 0}

            if _stop and _stop.is_set():
                log.debug("agent.planning_stopped", phase=2)
                return

        # ── Phase 3: Execution plan ────────────────────────────────────────────
        if not profile.planning_phase3_enabled:
            log.debug("agent.planning_phase3_skipped")
            _dbg["result"] = "phase1_only"
            if not is_subagent:
                yield PlanningDebugData(log=_dbg)
            return

        yield PlanningStatus(phase=3, label="Building execution plan...", is_subagent=is_subagent)

        advisor_block = (
            "Structured review feedback — apply the Plan Adjustments in your plan:\n\n"
            + advisor_feedback
            + "\n\n---\n\n"
        ) if advisor_feedback else ""

        phase3_system = Message(
            role="system",
            content=(
                _base_sys
                + "\n\n---\n\n"
                "[PLANNING — PHASE 3: EXECUTION PLAN]\n\n"
                "Task analysis:\n\n"
                f"{analysis}\n\n"
                "---\n\n"
                + advisor_block
                + available_tools_block
                + "Now write the execution plan. For each subtask assign a specific executor:\n"
                "- TOOL: <tool_name>  — a single tool call is enough; use exact tool names from the list above\n"
                "- AGENT: <profile_id>  — a bounded subtask needing 3+ tool calls; one subagent handles this ONE step\n"
                "- SELF  — final user-facing synthesis or an internal judgment that needs no tool call\n\n"
                "Executor classification rules (critical):\n"
                "- If a step names or implies a tool action, mark it TOOL with that exact tool name, never SELF.\n"
                "- Use TOOL for searching, reading, writing files, editing files, scratchpad notes, todo updates, "
                "image inspection, rendering, compiling, publishing, sharing, terminal commands, API calls, and verification through tool output.\n"
                "- Use SELF only for synthesis, choosing between already-known options, or explaining completed results.\n"
                "- If a planned step cannot be completed without later calling a tool, it is not SELF.\n\n"
                "Planning boundary (critical):\n"
                "The plan is an execution contract, not an implementation. It may describe intent, order, executor, "
                "inputs, expected outputs, and verification. It must NOT contain implementation code, source snippets, "
                "function bodies, CSS/HTML/SQL/Python/JS, patches, exact file contents, or detailed command scripts. "
                "Implementation belongs later in tool calls, file edits, terminal/code execution, or final artifacts. "
                "A valid plan says what to change and how to verify it, not the code that performs the change.\n\n"
                "Plan depth:\n"
                "- simple: 1-3 steps\n"
                "- medium: 5-9 steps\n"
                "- complex or autonomous: 8-15 steps\n"
                "- hard maximum: 15 steps\n"
                "Use enough steps to make execution unambiguous. Do not compress unrelated actions into one step.\n\n"
                "For every non-trivial task, include steps for information gathering from project notes/docs/files/tool schemas, "
                "implementation or analysis, verification, final synthesis, and NAVI.md updates when stable reusable project facts are discovered.\n\n"
                "AGENT scoping rules (critical):\n"
                "- Each AGENT step is one focused, independently verifiable unit of work.\n"
                "- One AGENT step = one spawn_agent call later. Do NOT bundle multiple concerns.\n"
                "- Comma test: if your step description lists things with 'and' or commas, "
                "each item is a separate step.\n"
                "- Good: 'Research X pricing from 3 sources' | 'Audit SSH config on host Y'\n"
                "- Bad: 'Research everything and write the full report' (too broad — split it)\n\n"
                "Required output format (use exactly this structure):\n\n"
                "## Plan\n\n"
                "**Task:** [reformulated task]\n"
                "**Goal:** [success criterion]\n\n"
                "**Milestones:**\n"
                "A. [strategic phase]\n"
                "B. [strategic phase]\n"
                "C. [strategic phase]\n\n"
                "**Steps:**\n"
                "1. [description] → TOOL: tool_name\n"
                "2. [description] → AGENT: profile_id\n"
                "3. [description] → AGENT: profile_id\n"
                "4. [description] → SELF\n"
                "... continue to the needed depth, up to 15 steps\n\n"
                "**Parallel:** [step numbers that can run simultaneously, or NONE]\n"
                "**Risks:** [unknowns to watch for, or NONE]\n\n"
                "Reject vague steps such as 'research and implement everything', 'fix all issues', "
                "or 'analyze project and make changes'. Split them into concrete, verifiable units. "
                "Do not write prose. Do not start executing. Plan only."
            ),
        )
        phase3_ctx: list[Message] = [phase3_system]
        if mem:
            phase3_ctx.append(mem)
        user_msgs = [m for m in context if m.role == "user"]
        if user_msgs:
            phase3_ctx.append(user_msgs[-1])

        try:
            r2 = await asyncio.wait_for(
                llm.complete(phase3_ctx, tools=None, temperature=0.3, model=profile.model, think=False),
                timeout=settings.llm_complete_timeout,
            )
            plan_text = (r2.content or "").strip()
        except asyncio.TimeoutError:
            log.warning("agent.planning_phase3_timeout", timeout=settings.llm_complete_timeout)
            _dbg["result"] = "phase3_timeout"
            if not is_subagent:
                yield PlanningDebugData(log=_dbg)
            return
        except Exception:
            log.warning("agent.planning_phase3_failed", exc_info=True)
            _dbg["result"] = "phase3_error"
            if not is_subagent:
                yield PlanningDebugData(log=_dbg)
            return

        if r2.prompt_tokens or r2.completion_tokens:
            yield AIHelperTokensUsed(
                prompt_tokens=r2.prompt_tokens or 0,
                completion_tokens=r2.completion_tokens or 0,
            )

        _dbg["phases"]["3"] = {
            "output": plan_text,
            "prompt_tokens": r2.prompt_tokens or 0,
            "completion_tokens": r2.completion_tokens or 0,
        }

        if not plan_text:
            _dbg["result"] = "empty_plan"
            if not is_subagent:
                yield PlanningDebugData(log=_dbg)
            return

        if not re.search(r"^\s*\d+[\.\)]", plan_text, re.MULTILINE):
            log.warning("agent.planning_no_numbered_steps", plan_preview=plan_text[:200])

        if _stop and _stop.is_set():
            log.debug("agent.planning_stopped", phase=3)
            return

        if not re.search(r"(TOOL:|AGENT:|→\s*SELF)", plan_text):
            log.warning("agent.planning_no_executors", hint="plan lacks TOOL/AGENT/SELF assignments")

        context.append(Message(role="assistant", content=plan_text))
        if messages is not None:
            messages.append(Message(role="assistant", content=plan_text, is_plan=True))

        context.append(Message(
            role="system",
            content="Plan is ready. Execute it now step by step, starting with step 1. Use the todo tool to track progress.",
        ))

        _todo_steps = _parse_plan_steps(plan_text)
        if _todo_steps:
            try:
                from navi.tools.todo import set_tasks
                from navi.tools.base import current_session_id as _sid_var
                _sid = _sid_var.get() or "__default__"
                set_tasks(_sid, _todo_steps)
                log.debug("agent.todo_auto_populated", steps=len(_todo_steps), session=_sid)
            except Exception:
                log.warning("agent.todo_auto_populate_failed", exc_info=True)

        log.debug("agent.plan_ready", phases=3 if advisor_feedback else 2, length=len(plan_text))
        if not is_subagent:
            yield PlanningDebugData(log=_dbg)
        yield PlanReady(plan=plan_text, is_subagent=is_subagent)