diff --git a/navi/core/agent.py b/navi/core/agent.py index 0e59591..f0138d7 100644 --- a/navi/core/agent.py +++ b/navi/core/agent.py @@ -30,6 +30,8 @@ from navi.tools.base import Tool, current_event_sink, current_stop_event from .compressor import compress_context, should_compress +from .context_builder import ContextBuilder +from .planning import PlanningEngine from .events import ( AgentEvent, AIHelperTokensUsed, @@ -49,6 +51,7 @@ ) from .registry import BackendRegistry, ProfileRegistry, ToolRegistry from .session import SessionStore +from .tool_executor import ToolExecutor if TYPE_CHECKING: from navi.context_providers._loader import ContextProviderRegistry @@ -151,22 +154,6 @@ return [] -def _parse_plan_steps(plan_text: str) -> list[str]: - """Extract numbered step lines from the **Steps:** section of a plan.""" - import re - m = re.search(r'\*\*Steps:\*\*\s*\n(.*?)(?=\n\s*\*\*[^*\n]+:\*\*|\Z)', plan_text, re.DOTALL) - if not m: - return [] - steps_block = m.group(1) - steps: list[str] = [] - for raw in re.findall(r'^\s*\d+[\.\)]\s*(.+)', steps_block, re.MULTILINE): - step = raw.strip() - if not step or step.startswith("["): - continue - steps.append(step) - return steps - - log = structlog.get_logger() # Sentinel: placed in the event sink by the tool wrapper to signal completion. @@ -213,7 +200,13 @@ self._workers: list["Worker"] = workers or [] self._memory = memory_store self._cp_registry = cp_registry - self._system_prompt_cache: dict[str, str] = {} + self._ctx_builder = ContextBuilder( + profile_registry=profile_registry, + memory_store=memory_store, + cp_registry=cp_registry, + ) + self._tool_executor = ToolExecutor(tool_registry) + self._planning = PlanningEngine(self._ctx_builder) # ------------------------------------------------------------------ # Public interface @@ -230,7 +223,7 @@ tool_schemas = [t.schema() for t in tools] llm = self._get_backend(profile.llm_backend) - mem = await self._memory_msg() + mem = await self._ctx_builder._memory_msg() # Expose session_id to tools (e.g. SSH connection pool) via ContextVar from navi.tools.base import current_session_id as _sid_var @@ -242,13 +235,13 @@ session.context.append(user_msg) await self._sessions.save(session) - ctx_injections = await self._collect_context_injections(profile) + ctx_injections = await self._ctx_builder._collect_context_injections(profile) for iteration in range(profile.max_iterations): log.debug("agent.iteration", session_id=session_id, iteration=iteration) response = await llm.complete( - self._build_context(session.context, profile, mem, - iteration=iteration, max_iterations=profile.max_iterations, - extra_system=ctx_injections), + self._ctx_builder.build(session.context, profile, mem, + iteration=iteration, max_iterations=profile.max_iterations, + extra_system=ctx_injections), tools=tool_schemas if tools else None, temperature=profile.temperature, model=profile.model, @@ -275,7 +268,7 @@ session.messages.append(assistant_msg) session.context.append(assistant_msg) - tool_results, image_injections = await self._execute_tool_calls(response.tool_calls, tools) + tool_results, image_injections = await self._tool_executor._execute_tool_calls(response.tool_calls, tools) session.messages.extend(tool_results) session.context.extend(tool_results) # Image injections are synthetic LLM helpers — context only @@ -334,7 +327,7 @@ tool_schemas = [t.schema() for t in tools] llm = self._get_backend(profile.llm_backend) - mem = await self._memory_msg() + mem = await self._ctx_builder._memory_msg() # Build subagent system prompt — completely separate from the parent's system prompt. # No persona, no orchestrator instructions, no profiles block. @@ -384,7 +377,7 @@ try: # ── Optional planning phase ──────────────────────────────────────────── if profile.subagent_planning_enabled: - async for _ev in self._run_planning( + async for _ev in self._planning.run( context, profile, llm, mem, tool_schemas, system_prompt_override=subagent_sys_msg.content, is_subagent=True, @@ -552,7 +545,7 @@ tool_schemas = [t.schema() for t in tools] llm = self._get_backend(profile.llm_backend) - mem = await self._memory_msg() + mem = await self._ctx_builder._memory_msg() # Expose session_id and model to tools via ContextVar from navi.tools.base import current_session_id as _sid_var, current_model as _model_var @@ -629,7 +622,7 @@ _is_first_message = sum(1 for m in session.messages if m.role == "user") == 1 _force_plan = _is_first_message or profile.planning_mandatory if _is_first_message or profile.planning_enabled: - async for _ev in self._run_planning(session.context, profile, llm, mem, tool_schemas, messages=session.messages, force_plan=_force_plan): + async for _ev in self._planning.run(session.context, profile, llm, mem, tool_schemas, messages=session.messages, force_plan=_force_plan): if isinstance(_ev, AIHelperTokensUsed): _subagent_tokens += _ev.total elif isinstance(_ev, PlanningDebugData): @@ -652,7 +645,7 @@ _known_failed: frozenset[tuple[int, str]] = frozenset() _replan_msg: str | None = None - ctx_injections = await self._collect_context_injections(profile) + ctx_injections = await self._ctx_builder._collect_context_injections(profile) # Tool-calling loop — uses stream_complete() for every turn so thinking # is captured in real-time via ThinkingDelta/ThinkingEnd events. @@ -669,16 +662,16 @@ thinking_active = False context_tokens: int | None = None - built_ctx = self._build_context(session.context, profile, mem, - iteration=iteration, max_iterations=profile.max_iterations, - extra_system=ctx_injections) + built_ctx = self._ctx_builder.build(session.context, profile, mem, + iteration=iteration, max_iterations=profile.max_iterations, + extra_system=ctx_injections) if ( profile.goal_anchoring_enabled and iteration > 0 and iteration % profile.goal_anchoring_interval == 0 ): - built_ctx.append(self._build_goal_anchor(session_id, user_message)) + built_ctx.append(self._ctx_builder._build_goal_anchor(session_id, user_message)) todo_msg = _todo_progress_message(session_id, first_iteration=(iteration == 0)) if todo_msg: @@ -834,7 +827,7 @@ async def _run_with_sentinel(_tc=tc, _holder=result_holder, _sink=sink): try: - _holder.append(await self._run_single_tool(_tc, tool_map)) + _holder.append(await self._tool_executor._run_single_tool(_tc, tool_map)) except Exception as exc: _holder.append(exc) finally: @@ -958,378 +951,6 @@ # Internal helpers # ------------------------------------------------------------------ - async def _run_planning( - self, - context: "list[Message]", - profile, - llm: LLMBackend, - mem: "Message | None", - tool_schemas: list | None = None, - messages: "list[Message] | None" = None, - system_prompt_override: str | None = None, - is_subagent: bool = False, - force_plan: bool = False, - ): - """ - Planning pipeline (async generator): - - Phase 1 — Analysis (think=profile.think_enabled): reformulate the task, - identify subtasks and unknowns. Outputs DIRECT for simple requests. - Outputs Reflect: yes/no to signal whether multi-perspective review - is warranted. - Phase 2 — Structured review (conditional, think=False): one critique pass - with Critic / Pragmatist / Detailer / Plan Adjustments sections. - Runs only when planning_phase2_enabled=True AND phase 1 outputs - Reflect: yes. - Phase 3 — Execution plan (think=False): assigns each subtask to TOOL / AGENT / SELF. - If phase 2 ran, advisor feedback is embedded in the prompt. - - Yields PlanningStatus before each phase so the UI can show progress, - then yields PlanReady when the final plan is ready. - Yields nothing if planning is skipped. - """ - import re as _re - - # ── Build compact tool list for Phase 2 / Phase 3 ───────────────────── - if tool_schemas: - tool_lines = [] - for schema in tool_schemas: - fn = schema.function if hasattr(schema, "function") else schema.get("function", {}) - name = fn.get("name", "") - desc = (fn.get("description") or "").split("\n")[0][:80] - tool_lines.append(f" - {name}: {desc}") - available_tools_block = ( - "Available tools (use these exact names for TOOL: assignments):\n" - + "\n".join(tool_lines) - + "\n\n" - ) - tool_names_list = ", ".join( - (schema.function if hasattr(schema, "function") else schema.get("function", {})).get("name", "") - for schema in tool_schemas - ) - else: - available_tools_block = "" - tool_names_list = "" - - # Read stop event once — checked between all phases - _stop = current_stop_event.get() - - # Debug log — collected across all phases, yielded at the end (main agent only) - _dbg: dict = {"timestamp": datetime.now(timezone.utc).isoformat(), "result": "plan", "phases": {}} - - _base_sys = system_prompt_override if system_prompt_override is not None else self._build_system_prompt(profile) - - # ── Phase 1: Task analysis (with reasoning) ──────────────────────────── - analysis: str = "" - if profile.planning_phase1_enabled: - yield PlanningStatus(phase=1, label="Working on it...", is_subagent=is_subagent) - phase1_system = Message( - role="system", - content=( - _base_sys - + "\n\n---\n\n" - "[PLANNING — PHASE 1: ANALYSIS]\n\n" - "Read the user's latest request.\n\n" - + ( - "" - if force_plan else - "If it is a simple question, casual conversation, or answerable in one step " - "without tools — respond with exactly: DIRECT\n\n" - ) - + available_tools_block - + "Analyse the request and output:\n\n" - "TASK: [one clear sentence — what actually needs to be done]\n" - "GOAL: [how you will know the task is complete]\n" - "UNKNOWNS: [genuine uncertainties that could block execution, or NONE]\n" - "RESOURCES:\n" - "- [tool_name]: [what it does] — [limitation if any] — [alternative if limitation blocks the goal]\n" - "- context sources: [which of memory / NAVI.md / web you will check and why]\n" - "COMPLEXITY: simple | medium | complex — choose based on ambiguity, number of files/systems, risk, and autonomy needed.\n" - "SUBTASKS:\n" - "1. [discrete unit of work]\n" - "2. [discrete unit of work]\n" - "ATOMICITY: For each subtask that requires multiple actions — if it fails halfway, " - "is any partial result still useful? If not, split it into smaller steps where " - "each one delivers an independent, usable result on its own.\n" - "REFLECT: yes — if the task is complex (multiple unknowns, external APIs, " - "research required, or high-stakes/irreversible actions); " - "no — if it is straightforward and the path is clear.\n" - "COMMITMENTS: [follow the plan step by step using the todo tool; gather any missing context independently without asking the user]\n\n" - "Rules: list enough subtasks to make execution unambiguous. " - "Simple tasks usually need 1-3 subtasks; medium tasks 5-9; complex or autonomous tasks 8-15. " - "Hard maximum: 15 subtasks. Each must be concrete and actionable. " - "No execution yet — analysis only." - ), - ) - phase1_ctx: list[Message] = [phase1_system] - if mem: - phase1_ctx.append(mem) - phase1_ctx.extend(m for m in context if m.role != "system") - - try: - r1 = await asyncio.wait_for( - llm.complete(phase1_ctx, tools=None, temperature=0.3, model=profile.model, think=profile.think_enabled), - timeout=settings.llm_complete_timeout, - ) - analysis = (r1.content or "").strip() - except asyncio.TimeoutError: - log.warning("agent.planning_phase1_timeout", timeout=settings.llm_complete_timeout) - _dbg["result"] = "phase1_timeout" - if not is_subagent: - yield PlanningDebugData(log=_dbg) - return - except Exception: - log.warning("agent.planning_phase1_failed", exc_info=True) - _dbg["result"] = "phase1_error" - if not is_subagent: - yield PlanningDebugData(log=_dbg) - return - - if r1.prompt_tokens or r1.completion_tokens: - yield AIHelperTokensUsed( - prompt_tokens=r1.prompt_tokens or 0, - completion_tokens=r1.completion_tokens or 0, - ) - - _dbg["phases"]["1"] = { - "output": analysis, - "prompt_tokens": r1.prompt_tokens or 0, - "completion_tokens": r1.completion_tokens or 0, - } - - if not analysis or analysis.upper().startswith("DIRECT"): - log.debug("agent.planning_skipped", reason="direct") - _dbg["result"] = "direct" - if not is_subagent: - yield PlanningDebugData(log=_dbg) - return - - if _stop and _stop.is_set(): - log.debug("agent.planning_stopped", phase=1) - return - else: - log.debug("agent.planning_phase1_skipped") - - # ── Phase 2: Structured review (conditional) ─────────────────────────── - # Runs only when planning_phase2_enabled=True AND phase 1 signals - # that the task is complex enough to warrant critique. - advisor_feedback: str = "" - needs_reflect = bool(_re.search(r"REFLECT\s*:\s*yes", analysis, _re.IGNORECASE)) - - if profile.planning_phase2_enabled and needs_reflect and not is_subagent: - yield PlanningStatus(phase=2, label="Reviewing plan...", is_subagent=is_subagent) - - review_system = Message( - role="system", - content=( - _base_sys - + "\n\n---\n\n" - "[PLANNING - PHASE 2: STRUCTURED REVIEW]\n\n" - "Review the phase 1 task analysis before execution. " - "Do not change the user's goal. Do not invent facts. " - "Prefer resolving missing information through NAVI.md, docs, manuals, memory, files, " - "tool schemas, command output, or web research before asking the user.\n\n" - "Return exactly these sections:\n\n" - "## Critic\n" - "- 3-5 bullets: wrong or unverified assumptions, ignored risks, contradictions, " - "and facts that must be verified before acting.\n\n" - "## Pragmatist\n" - "- 3-5 bullets: simpler path, unnecessary steps, mergeable steps, better executor choices, " - "and cheaper ways to reach the user's actual goal.\n\n" - "## Detailer\n" - "- 3-5 bullets: missing requirements, missing docs/files/tools to inspect, edge cases, " - "and validation steps.\n\n" - "## Plan Adjustments\n" - "- Concrete changes Phase 3 must apply: add/remove/split/merge/reorder steps, " - "change TOOL/AGENT/SELF executor, verify specific facts, or defer user questions " - "until available sources are checked.\n\n" - "Keep output concise. No prose outside these sections.\n\n" - f"PHASE 1 ANALYSIS:\n{analysis}" - ), - ) - review_ctx: list[Message] = [review_system] - if mem: - review_ctx.append(mem) - review_ctx.extend(m for m in context if m.role != "system") - try: - r_review = await asyncio.wait_for( - llm.complete(review_ctx, tools=None, temperature=0.35, model=profile.model, think=False), - timeout=settings.llm_complete_timeout, - ) - advisor_feedback = (r_review.content or "").strip() - if r_review.prompt_tokens or r_review.completion_tokens: - yield AIHelperTokensUsed( - prompt_tokens=r_review.prompt_tokens or 0, - completion_tokens=r_review.completion_tokens or 0, - ) - _dbg["phases"]["2"] = { - "output": advisor_feedback, - "prompt_tokens": r_review.prompt_tokens or 0, - "completion_tokens": r_review.completion_tokens or 0, - } - log.debug("agent.planning_review_done", has_output=bool(advisor_feedback)) - except Exception: - log.warning("agent.planning_review_failed", exc_info=True) - _dbg["phases"]["2"] = {"output": "", "prompt_tokens": 0, "completion_tokens": 0} - - if _stop and _stop.is_set(): - log.debug("agent.planning_stopped", phase=2) - return - - # ── Phase 3: Execution plan ──────────────────────────────────────────── - if not profile.planning_phase3_enabled: - log.debug("agent.planning_phase3_skipped") - _dbg["result"] = "phase1_only" - if not is_subagent: - yield PlanningDebugData(log=_dbg) - return - - yield PlanningStatus(phase=3, label="Building execution plan...", is_subagent=is_subagent) - - advisor_block = ( - "Structured review feedback — apply the Plan Adjustments in your plan:\n\n" - + advisor_feedback - + "\n\n---\n\n" - ) if advisor_feedback else "" - - phase3_system = Message( - role="system", - content=( - _base_sys - + "\n\n---\n\n" - "[PLANNING — PHASE 3: EXECUTION PLAN]\n\n" - "Task analysis:\n\n" - f"{analysis}\n\n" - "---\n\n" - + advisor_block - + available_tools_block - + "Now write the execution plan. For each subtask assign a specific executor:\n" - "- TOOL: — a single tool call is enough; use exact tool names from the list above\n" - "- AGENT: — a bounded subtask needing 3+ tool calls; one subagent handles this ONE step\n" - "- SELF — final synthesis or a context-dependent single action only\n\n" - "Plan depth:\n" - "- simple: 1-3 steps\n" - "- medium: 5-9 steps\n" - "- complex or autonomous: 8-15 steps\n" - "- hard maximum: 15 steps\n" - "Use enough steps to make execution unambiguous. Do not compress unrelated actions into one step.\n\n" - "For every non-trivial task, include steps for information gathering from project notes/docs/files/tool schemas, " - "implementation or analysis, verification, final synthesis, and NAVI.md updates when stable reusable project facts are discovered.\n\n" - "AGENT scoping rules (critical):\n" - "- Each AGENT step is one focused, independently verifiable unit of work.\n" - "- One AGENT step = one spawn_agent call later. Do NOT bundle multiple concerns.\n" - "- Comma test: if your step description lists things with 'and' or commas, " - "each item is a separate step.\n" - "- Good: 'Research X pricing from 3 sources' | 'Audit SSH config on host Y'\n" - "- Bad: 'Research everything and write the full report' (too broad — split it)\n\n" - "Required output format (use exactly this structure):\n\n" - "## Plan\n\n" - "**Task:** [reformulated task]\n" - "**Goal:** [success criterion]\n\n" - "**Milestones:**\n" - "A. [strategic phase]\n" - "B. [strategic phase]\n" - "C. [strategic phase]\n\n" - "**Steps:**\n" - "1. [description] → TOOL: tool_name\n" - "2. [description] → AGENT: profile_id\n" - "3. [description] → AGENT: profile_id\n" - "4. [description] → SELF\n" - "... continue to the needed depth, up to 15 steps\n\n" - "**Parallel:** [step numbers that can run simultaneously, or NONE]\n" - "**Risks:** [unknowns to watch for, or NONE]\n\n" - "Reject vague steps such as 'research and implement everything', 'fix all issues', " - "or 'analyze project and make changes'. Split them into concrete, verifiable units. " - "Do not write prose. Do not start executing. Plan only." - ), - ) - # Phase 3 only needs the analysis (embedded above) and the original request. - # Full history is intentionally excluded to keep the focus on plan structure. - phase3_ctx: list[Message] = [phase3_system] - if mem: - phase3_ctx.append(mem) - user_msgs = [m for m in context if m.role == "user"] - if user_msgs: - phase3_ctx.append(user_msgs[-1]) - - try: - r2 = await asyncio.wait_for( - llm.complete(phase3_ctx, tools=None, temperature=0.3, model=profile.model, think=False), - timeout=settings.llm_complete_timeout, - ) - plan_text = (r2.content or "").strip() - except asyncio.TimeoutError: - log.warning("agent.planning_phase3_timeout", timeout=settings.llm_complete_timeout) - _dbg["result"] = "phase3_timeout" - if not is_subagent: - yield PlanningDebugData(log=_dbg) - return - except Exception: - log.warning("agent.planning_phase3_failed", exc_info=True) - _dbg["result"] = "phase3_error" - if not is_subagent: - yield PlanningDebugData(log=_dbg) - return - - if r2.prompt_tokens or r2.completion_tokens: - yield AIHelperTokensUsed( - prompt_tokens=r2.prompt_tokens or 0, - completion_tokens=r2.completion_tokens or 0, - ) - - _dbg["phases"]["3"] = { - "output": plan_text, - "prompt_tokens": r2.prompt_tokens or 0, - "completion_tokens": r2.completion_tokens or 0, - } - - if not plan_text: - _dbg["result"] = "empty_plan" - if not is_subagent: - yield PlanningDebugData(log=_dbg) - return - - # Warn if no numbered steps but still use the plan - if not _re.search(r"^\s*\d+[\.\)]", plan_text, _re.MULTILINE): - log.warning("agent.planning_no_numbered_steps", plan_preview=plan_text[:200]) - - if _stop and _stop.is_set(): - log.debug("agent.planning_stopped", phase=3) - return - - # Warn if executor assignments are still missing (plan may be malformed) - if not _re.search(r"(TOOL:|AGENT:|→\s*SELF)", plan_text): - log.warning("agent.planning_no_executors", hint="plan lacks TOOL/AGENT/SELF assignments") - - # Inject plan into context so the main loop continues from it, - # and into messages (with is_plan flag) so the UI can render a plan card after reload. - context.append(Message(role="assistant", content=plan_text)) - if messages is not None: - messages.append(Message(role="assistant", content=plan_text, is_plan=True)) - - # Prompt execution: without this the model treats the plan as a completed response. - context.append(Message( - role="system", - content="Plan is ready. Execute it now step by step, starting with step 1. Use the todo tool to track progress.", - )) - - # Auto-populate todo from plan steps — model only needs to call 'update' after each step. - _todo_steps = _parse_plan_steps(plan_text) - if _todo_steps: - try: - from navi.tools.todo import set_tasks - from navi.tools.base import current_session_id as _sid_var - _sid = _sid_var.get() or "__default__" - set_tasks(_sid, _todo_steps) - log.debug("agent.todo_auto_populated", steps=len(_todo_steps), session=_sid) - except Exception: - log.warning("agent.todo_auto_populate_failed", exc_info=True) - - log.debug("agent.plan_ready", phases=3 if advisor_feedback else 2, length=len(plan_text)) - if not is_subagent: - yield PlanningDebugData(log=_dbg) - yield PlanReady(plan=plan_text, is_subagent=is_subagent) - async def _run_workers( self, session, @@ -1359,133 +980,6 @@ worker=type(worker).__name__, exc_info=True) return events - async def _memory_msg(self) -> "Message | None": - """Return an ephemeral system message with the user memory summary, or None.""" - if not self._memory: - return None - summary = await self._memory.get_summary() - if not summary: - return None - return Message(role="system", content=f"## What I remember about the user\n\n{summary}") - - async def _collect_context_injections(self, profile: "AgentProfile") -> list[Message]: - """Run context providers for this profile and return system messages to inject.""" - if not self._cp_registry: - return [] - providers = self._cp_registry.get_globals() - for p in self._cp_registry.get_named(profile.context_providers): - if p not in providers: - providers.append(p) - msgs: list[Message] = [] - for provider in providers: - try: - text = await provider.get_context() - if text: - msgs.append(Message(role="system", content=text)) - except Exception as exc: - log.warning("context_provider.error", name=getattr(provider, "name", "?"), error=str(exc)) - return msgs - - def _build_context( - self, - session_context: list[Message], - profile: "AgentProfile", - mem: "Message | None", - iteration: int | None = None, - max_iterations: int | None = None, - extra_system: list[Message] | None = None, - ) -> list[Message]: - """Build the full LLM context for one call. - - System prompt is injected fresh from the current profile every time — - it is NOT stored in session.context so that profile switches take - effect immediately without touching stored history. - Memory (if any) is placed right after the system message. - Any system messages already in session.context are stripped (migration safety). - """ - system_msg = Message( - role="system", - content=self._build_system_prompt(profile), - ) - conv = [m for m in session_context if m.role != "system"] - result: list[Message] = [system_msg] - if mem: - result.append(mem) - if extra_system: - result.extend(extra_system) - result.extend(conv) - - if profile.iteration_budget_enabled and iteration is not None and max_iterations is not None: - remaining = max_iterations - iteration - if remaining <= 3: - urgency = f" CRITICAL: only {remaining} iteration(s) left — finish or produce a partial result now, do not start new subtasks." - elif remaining <= 7: - urgency = " Start wrapping up: prioritize completing current work over starting new subtasks." - else: - urgency = "" - result.append(Message( - role="system", - content=f"[Iteration {iteration + 1}/{max_iterations} — {remaining} remaining.{urgency}]", - )) - - return result - - def _build_system_prompt(self, profile: "AgentProfile") -> str: - cached = self._system_prompt_cache.get(profile.id) - if cached is not None: - return cached - - parts: list[str] = [] - - persona = settings.navi_persona.strip() - if persona: - parts.append(persona) - - parts.append(profile.system_prompt) - - # Compact profiles block — every agent knows what other profiles exist - # and when to switch. Injected dynamically so new profiles appear automatically. - other = [p for p in self._profiles.all() if p.id != profile.id] - if other: - lines = [ - "## Available profiles", - f"Current: **{profile.id}**", - ] - for p in other: - desc = p.short_description or p.description - lines.append(f"· {p.id}: {desc}") - lines.append("→ Switch profiles on your own judgment — do not ask for permission. When a task clearly fits another profile, call switch_profile immediately, then inform the user which profile is now active and why. Use list_profiles if you need details about a profile's capabilities.") - parts.append("\n".join(lines)) - - result = "\n\n---\n\n".join(parts) - self._system_prompt_cache[profile.id] = result - return result - - def _build_goal_anchor(self, session_id: str, user_message: str) -> Message: - """Build a goal-anchor system message injected every N iterations. - - Reminds the model of the original user request and the current todo - state so it doesn't drift on long multi-step tasks. - """ - lines = [ - "[Goal anchor]", - f"Original request: {user_message}", - ] - - try: - from navi.tools.todo import render_todo_lines - todo_lines = render_todo_lines(session_id) - if todo_lines: - lines.append("Current todo:") - lines.extend(todo_lines) - except Exception: - pass - - lines.append("Stay on track — complete the remaining pending/in_progress steps.") - lines.append("Use 1-based todo indexes. Mark completed steps done only after verification, with validation.") - lines.append("Before final response, update todo for every completed step, including the final one.") - return Message(role="system", content="\n".join(lines)) - def _tool_list(self, enabled: list[str]) -> list[Tool]: names = list(enabled) extra = _load_user_enabled_tools() @@ -1540,125 +1034,3 @@ "Split the file into smaller parts or delegate to a subagent." ) - async def _run_single_tool( - self, - tc: ToolCallRequest, - tool_map: dict[str, Tool], - ) -> tuple[ToolEvent, Message, "Message | None"]: - """Execute one tool call and return (ToolEvent, tool_msg, optional_image_msg). - - Called via asyncio.create_task() from run_stream() so that the parent - generator can drain the event sink queue concurrently. - """ - tool = tool_map.get(tc.name) - image_msg = None - metadata: dict = {} - if tool is None: - content = f"Error: tool '{tc.name}' not found." - event = ToolEvent(tool_name=tc.name, arguments=tc.arguments, - result=content, success=False) - else: - log.info("tool.execute", tool=tc.name, args=tc.arguments) - result = await tool.execute(tc.arguments) - content = result.to_message_content() - metadata = result.metadata or {} - event = ToolEvent(tool_name=tc.name, arguments=tc.arguments, - result=content, success=result.success, - metadata=metadata) - if result.success and result.metadata and result.metadata.get("is_image"): - b64 = result.metadata.get("base64") - if b64: - image_msg = Message( - role="user", - content=f"[Image loaded via {tc.name} — analyse it]", - images=[b64], - ) - msg = Message(role="tool", content=content, tool_call_id=tc.id, - name=tc.name, metadata=metadata) - return event, msg, image_msg - - async def _execute_tool_calls( - self, tool_calls: list[ToolCallRequest], tools: list[Tool] - ) -> tuple[list[Message], list[Message]]: - tool_map = {t.name: t for t in tools} - middlewares = getattr(self._tools, "_middlewares", []) - - async def _run_one(tc: ToolCallRequest) -> tuple[Message, Message | None]: - tool = tool_map.get(tc.name) - image_msg = None - metadata: dict = {} - if tool is None: - content = f"Error: tool '{tc.name}' not found." - else: - log.info("tool.execute", tool=tc.name, args=tc.arguments) - for mw in middlewares: - await mw.before_execute(tc.name, tc.arguments) - result = await tool.execute(tc.arguments) - for mw in middlewares: - await mw.after_execute(tc.name, tc.arguments, result) - content = result.to_message_content() - metadata = result.metadata or {} - if result.success and result.metadata and result.metadata.get("is_image"): - b64 = result.metadata.get("base64") - if b64: - image_msg = Message( - role="user", - content=f"[Image loaded via {tc.name} — analyse it]", - images=[b64], - ) - tool_msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name, metadata=metadata) - return tool_msg, image_msg - - pairs = await asyncio.gather(*[_run_one(tc) for tc in tool_calls]) - tool_msgs = [p[0] for p in pairs] - image_msgs = [p[1] for p in pairs if p[1] is not None] - return tool_msgs, image_msgs - - async def _execute_tool_calls_streaming( - self, tool_calls: list[ToolCallRequest], tools: list[Tool] - ) -> tuple[list[tuple[ToolEvent, Message]], list[Message]]: - tool_map = {t.name: t for t in tools} - middlewares = getattr(self._tools, "_middlewares", []) - - async def _run_one(tc: ToolCallRequest) -> tuple[ToolEvent, Message, Message | None]: - tool = tool_map.get(tc.name) - image_msg = None - metadata: dict = {} - if tool is None: - content = f"Error: tool '{tc.name}' not found." - event = ToolEvent( - tool_name=tc.name, arguments=tc.arguments, result=content, success=False - ) - else: - log.info("tool.execute", tool=tc.name, args=tc.arguments) - # Run middleware before - for mw in middlewares: - await mw.before_execute(tc.name, tc.arguments) - result = await tool.execute(tc.arguments) - # Run middleware after - for mw in middlewares: - await mw.after_execute(tc.name, tc.arguments, result) - content = result.to_message_content() - metadata = result.metadata or {} - event = ToolEvent( - tool_name=tc.name, - arguments=tc.arguments, - result=content, - success=result.success, - metadata=metadata, - ) - if result.success and result.metadata and result.metadata.get("is_image"): - b64 = result.metadata.get("base64") - if b64: - image_msg = Message( - role="user", - content=f"[Image loaded via {tc.name} — analyse it]", - images=[b64], - ) - msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name, metadata=metadata) - return event, msg, image_msg - - triples = await asyncio.gather(*[_run_one(tc) for tc in tool_calls]) - pairs = [(t[0], t[1]) for t in triples] - image_msgs = [t[2] for t in triples if t[2] is not None] - return pairs, image_msgs diff --git a/navi/core/context_builder.py b/navi/core/context_builder.py new file mode 100644 index 0000000..940e1b3 --- /dev/null +++ b/navi/core/context_builder.py @@ -0,0 +1,157 @@ +"""Build LLM context lists for agent turns. + +Extracted from agent.py to reduce the Agent class surface area. +""" + +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +from navi.config import settings +from navi.llm.base import Message + +if TYPE_CHECKING: + from navi.context_providers._loader import ContextProviderRegistry + from navi.memory.store import MemoryStore + from navi.profiles.base import AgentProfile + + +def render_todo_lines(session_id: str) -> list[str]: + """Return a list of formatted todo lines for goal anchoring.""" + try: + from navi.tools.todo import render_todo_lines as _rtl + return _rtl(session_id) + except Exception: + return [] + + +class ContextBuilder: + """Handles construction of the full LLM context for each turn.""" + + def __init__( + self, + profile_registry, + memory_store: "MemoryStore | None" = None, + cp_registry: "ContextProviderRegistry | None" = None, + ) -> None: + self._profiles = profile_registry + self._memory = memory_store + self._cp_registry = cp_registry + self._system_prompt_cache: dict[str, str] = {} + + def build_system_prompt(self, profile: "AgentProfile") -> str: + """Build the system prompt string for a profile (cached per profile).""" + cached = self._system_prompt_cache.get(profile.id) + if cached is not None: + return cached + + parts: list[str] = [] + persona = settings.navi_persona.strip() + if persona: + parts.append(persona) + parts.append(profile.system_prompt) + + other = [p for p in self._profiles.all() if p.id != profile.id] + if other: + lines = [ + "## Available profiles", + f"Current: **{profile.id}**", + ] + for p in other: + desc = p.short_description or p.description + lines.append(f"· {p.id}: {desc}") + lines.append( + "→ Switch profiles on your own judgment — do not ask for permission. " + "When a task clearly fits another profile, call switch_profile immediately, " + "then inform the user which profile is now active and why. " + "Use list_profiles if you need details about a profile's capabilities." + ) + parts.append("\n".join(lines)) + + result = "\n\n---\n\n".join(parts) + self._system_prompt_cache[profile.id] = result + return result + + def invalidate_system_prompt_cache(self, profile_id: str | None = None) -> None: + if profile_id is None: + self._system_prompt_cache.clear() + else: + self._system_prompt_cache.pop(profile_id, None) + + async def _memory_msg(self) -> "Message | None": + if self._memory is None: + return None + summary = await self._memory.get_summary() + if not summary: + return None + return Message( + role="system", + content=f"## What I remember about the user\n\n{summary}", + ) + + async def _collect_context_injections(self, profile: "AgentProfile") -> list[Message]: + if self._cp_registry is None: + return [] + out: list[Message] = [] + for provider in self._cp_registry.all(): + if not provider.global_provider and provider.name not in profile.context_providers: + continue + try: + text = await provider.get_context() + if text: + out.append(Message(role="system", content=text)) + except Exception: + pass + return out + + def _build_goal_anchor(self, session_id: str, user_message: str) -> Message: + lines = [ + "[Goal anchor]", + f"Original request: {user_message}", + ] + todo_lines = render_todo_lines(session_id) + if todo_lines: + lines.append("Current todo:") + lines.extend(todo_lines) + lines.append("Stay on track — complete the remaining pending/in_progress steps.") + lines.append("Use 1-based todo indexes. Mark completed steps done only after verification, with validation.") + lines.append("Before final response, update todo for every completed step, including the final one.") + return Message(role="system", content="\n".join(lines)) + + def build( + self, + session_context: list[Message], + profile: "AgentProfile", + mem: "Message | None", + iteration: int | None = None, + max_iterations: int | None = None, + extra_system: list[Message] | None = None, + ) -> list[Message]: + system_msg = Message( + role="system", + content=self.build_system_prompt(profile), + ) + conv = [m for m in session_context if m.role != "system"] + result: list[Message] = [system_msg] + if mem: + result.append(mem) + if extra_system: + result.extend(extra_system) + result.extend(conv) + + if profile.iteration_budget_enabled and iteration is not None and max_iterations is not None: + remaining = max_iterations - iteration + if remaining <= 3: + urgency = ( + f" CRITICAL: only {remaining} iteration(s) left — finish or produce " + "a partial result now, do not start new subtasks." + ) + elif remaining <= 7: + urgency = " Start wrapping up: prioritize completing current work over starting new subtasks." + else: + urgency = "" + result.append(Message( + role="system", + content=f"[Iteration {iteration + 1}/{max_iterations} — {remaining} remaining.{urgency}]", + )) + + return result diff --git a/navi/core/planning.py b/navi/core/planning.py new file mode 100644 index 0000000..360b16d --- /dev/null +++ b/navi/core/planning.py @@ -0,0 +1,387 @@ +"""Planning pipeline — extracted from agent.py. + +Async generator that runs 3-phase planning (analysis → review → execution plan). +""" + +import asyncio +import re +from datetime import datetime, timezone +from typing import AsyncGenerator + +import structlog + +from navi.config import settings +from navi.llm.base import Message +from navi.tools.base import current_stop_event + +from .events import ( + AIHelperTokensUsed, + PlanningDebugData, + PlanningStatus, + PlanReady, +) + +log = structlog.get_logger() + + +def _parse_plan_steps(plan_text: str) -> list[str]: + """Extract numbered step lines from the **Steps:** section of a plan.""" + m = re.search(r'\*\*Steps:\*\*\s*\n(.*?)(?=\n\s*\*\*[^*\n]+:\*\*|\Z)', plan_text, re.DOTALL) + if not m: + return [] + steps_block = m.group(1) + steps: list[str] = [] + for raw in re.findall(r'^\s*\d+[\.\)]\s*(.+)', steps_block, re.MULTILINE): + step = raw.strip() + if not step or step.startswith("["): + continue + steps.append(step) + return steps + + +class PlanningEngine: + """Runs the 3-phase planning pipeline.""" + + def __init__(self, ctx_builder) -> None: + self._ctx_builder = ctx_builder + + async def run( + self, + context: list[Message], + profile, + llm, + mem: Message | None, + tool_schemas: list | None = None, + messages: list[Message] | None = None, + system_prompt_override: str | None = None, + is_subagent: bool = False, + force_plan: bool = False, + ) -> AsyncGenerator: + """Planning pipeline (async generator): + + Phase 1 — Analysis (think=profile.think_enabled): reformulate the task, + identify subtasks and unknowns. Outputs DIRECT for simple requests. + Phase 2 — Structured review (conditional, think=False): one critique pass. + Phase 3 — Execution plan (think=False): assigns each subtask to TOOL / AGENT / SELF. + """ + # ── Build compact tool list for Phase 2 / Phase 3 ───────────────────── + if tool_schemas: + tool_lines = [] + for schema in tool_schemas: + fn = schema.function if hasattr(schema, "function") else schema.get("function", {}) + name = fn.get("name", "") + desc = (fn.get("description") or "").split("\n")[0][:80] + tool_lines.append(f" - {name}: {desc}") + available_tools_block = ( + "Available tools (use these exact names for TOOL: assignments):\n" + + "\n".join(tool_lines) + + "\n\n" + ) + else: + available_tools_block = "" + + _stop = current_stop_event.get() + _dbg: dict = {"timestamp": datetime.now(timezone.utc).isoformat(), "result": "plan", "phases": {}} + + _base_sys = system_prompt_override if system_prompt_override is not None else self._ctx_builder.build_system_prompt(profile) + + # ── Phase 1: Task analysis ──────────────────────────────────────────── + analysis: str = "" + if profile.planning_phase1_enabled: + yield PlanningStatus(phase=1, label="Working on it...", is_subagent=is_subagent) + phase1_system = Message( + role="system", + content=( + _base_sys + + "\n\n---\n\n" + "[PLANNING — PHASE 1: ANALYSIS]\n\n" + "Read the user's latest request.\n\n" + + ( + "" + if force_plan else + "If it is a simple question, casual conversation, or answerable in one step " + "without tools — respond with exactly: DIRECT\n\n" + ) + + available_tools_block + + "Analyse the request and output:\n\n" + "TASK: [one clear sentence — what actually needs to be done]\n" + "GOAL: [how you will know the task is complete]\n" + "UNKNOWNS: [genuine uncertainties that could block execution, or NONE]\n" + "RESOURCES:\n" + "- [tool_name]: [what it does] — [limitation if any] — [alternative if limitation blocks the goal]\n" + "- context sources: [which of memory / NAVI.md / web you will check and why]\n" + "COMPLEXITY: simple | medium | complex — choose based on ambiguity, number of files/systems, risk, and autonomy needed.\n" + "SUBTASKS:\n" + "1. [discrete unit of work]\n" + "2. [discrete unit of work]\n" + "ATOMICITY: For each subtask that requires multiple actions — if it fails halfway, " + "is any partial result still useful? If not, split it into smaller steps where " + "each one delivers an independent, usable result on its own.\n" + "REFLECT: yes — if the task is complex (multiple unknowns, external APIs, " + "research required, or high-stakes/irreversible actions); " + "no — if it is straightforward and the path is clear.\n" + "COMMITMENTS: [follow the plan step by step using the todo tool; gather any missing context independently without asking the user]\n\n" + "Rules: list enough subtasks to make execution unambiguous. " + "Simple tasks usually need 1-3 subtasks; medium tasks 5-9; complex or autonomous tasks 8-15. " + "Hard maximum: 15 subtasks. Each must be concrete and actionable. " + "No execution yet — analysis only." + ), + ) + phase1_ctx: list[Message] = [phase1_system] + if mem: + phase1_ctx.append(mem) + phase1_ctx.extend(m for m in context if m.role != "system") + + try: + r1 = await asyncio.wait_for( + llm.complete(phase1_ctx, tools=None, temperature=0.3, model=profile.model, think=profile.think_enabled), + timeout=settings.llm_complete_timeout, + ) + analysis = (r1.content or "").strip() + except asyncio.TimeoutError: + log.warning("agent.planning_phase1_timeout", timeout=settings.llm_complete_timeout) + _dbg["result"] = "phase1_timeout" + if not is_subagent: + yield PlanningDebugData(log=_dbg) + return + except Exception: + log.warning("agent.planning_phase1_failed", exc_info=True) + _dbg["result"] = "phase1_error" + if not is_subagent: + yield PlanningDebugData(log=_dbg) + return + + if r1.prompt_tokens or r1.completion_tokens: + yield AIHelperTokensUsed( + prompt_tokens=r1.prompt_tokens or 0, + completion_tokens=r1.completion_tokens or 0, + ) + + _dbg["phases"]["1"] = { + "output": analysis, + "prompt_tokens": r1.prompt_tokens or 0, + "completion_tokens": r1.completion_tokens or 0, + } + + if not analysis or analysis.upper().startswith("DIRECT"): + log.debug("agent.planning_skipped", reason="direct") + _dbg["result"] = "direct" + if not is_subagent: + yield PlanningDebugData(log=_dbg) + return + + if _stop and _stop.is_set(): + log.debug("agent.planning_stopped", phase=1) + return + else: + log.debug("agent.planning_phase1_skipped") + + # ── Phase 2: Structured review (conditional) ─────────────────────────── + advisor_feedback: str = "" + needs_reflect = bool(re.search(r"REFLECT\s*:\s*yes", analysis, re.IGNORECASE)) + + if profile.planning_phase2_enabled and needs_reflect and not is_subagent: + yield PlanningStatus(phase=2, label="Reviewing plan...", is_subagent=is_subagent) + + review_system = Message( + role="system", + content=( + _base_sys + + "\n\n---\n\n" + "[PLANNING - PHASE 2: STRUCTURED REVIEW]\n\n" + "Review the phase 1 task analysis before execution. " + "Do not change the user's goal. Do not invent facts. " + "Prefer resolving missing information through NAVI.md, docs, manuals, memory, files, " + "tool schemas, command output, or web research before asking the user.\n\n" + "Return exactly these sections:\n\n" + "## Critic\n" + "- 3-5 bullets: wrong or unverified assumptions, ignored risks, contradictions, " + "and facts that must be verified before acting.\n\n" + "## Pragmatist\n" + "- 3-5 bullets: simpler path, unnecessary steps, mergeable steps, better executor choices, " + "and cheaper ways to reach the user's actual goal.\n\n" + "## Detailer\n" + "- 3-5 bullets: missing requirements, missing docs/files/tools to inspect, edge cases, " + "and validation steps.\n\n" + "## Plan Adjustments\n" + "- Concrete changes Phase 3 must apply: add/remove/split/merge/reorder steps, " + "change TOOL/AGENT/SELF executor, verify specific facts, or defer user questions " + "until available sources are checked.\n\n" + "Keep output concise. No prose outside these sections.\n\n" + f"PHASE 1 ANALYSIS:\n{analysis}" + ), + ) + review_ctx: list[Message] = [review_system] + if mem: + review_ctx.append(mem) + review_ctx.extend(m for m in context if m.role != "system") + try: + r_review = await asyncio.wait_for( + llm.complete(review_ctx, tools=None, temperature=0.35, model=profile.model, think=False), + timeout=settings.llm_complete_timeout, + ) + advisor_feedback = (r_review.content or "").strip() + if r_review.prompt_tokens or r_review.completion_tokens: + yield AIHelperTokensUsed( + prompt_tokens=r_review.prompt_tokens or 0, + completion_tokens=r_review.completion_tokens or 0, + ) + _dbg["phases"]["2"] = { + "output": advisor_feedback, + "prompt_tokens": r_review.prompt_tokens or 0, + "completion_tokens": r_review.completion_tokens or 0, + } + log.debug("agent.planning_review_done", has_output=bool(advisor_feedback)) + except Exception: + log.warning("agent.planning_review_failed", exc_info=True) + _dbg["phases"]["2"] = {"output": "", "prompt_tokens": 0, "completion_tokens": 0} + + if _stop and _stop.is_set(): + log.debug("agent.planning_stopped", phase=2) + return + + # ── Phase 3: Execution plan ──────────────────────────────────────────── + if not profile.planning_phase3_enabled: + log.debug("agent.planning_phase3_skipped") + _dbg["result"] = "phase1_only" + if not is_subagent: + yield PlanningDebugData(log=_dbg) + return + + yield PlanningStatus(phase=3, label="Building execution plan...", is_subagent=is_subagent) + + advisor_block = ( + "Structured review feedback — apply the Plan Adjustments in your plan:\n\n" + + advisor_feedback + + "\n\n---\n\n" + ) if advisor_feedback else "" + + phase3_system = Message( + role="system", + content=( + _base_sys + + "\n\n---\n\n" + "[PLANNING — PHASE 3: EXECUTION PLAN]\n\n" + "Task analysis:\n\n" + f"{analysis}\n\n" + "---\n\n" + + advisor_block + + available_tools_block + + "Now write the execution plan. For each subtask assign a specific executor:\n" + "- TOOL: — a single tool call is enough; use exact tool names from the list above\n" + "- AGENT: — a bounded subtask needing 3+ tool calls; one subagent handles this ONE step\n" + "- SELF — final synthesis or a context-dependent single action only\n\n" + "Plan depth:\n" + "- simple: 1-3 steps\n" + "- medium: 5-9 steps\n" + "- complex or autonomous: 8-15 steps\n" + "- hard maximum: 15 steps\n" + "Use enough steps to make execution unambiguous. Do not compress unrelated actions into one step.\n\n" + "For every non-trivial task, include steps for information gathering from project notes/docs/files/tool schemas, " + "implementation or analysis, verification, final synthesis, and NAVI.md updates when stable reusable project facts are discovered.\n\n" + "AGENT scoping rules (critical):\n" + "- Each AGENT step is one focused, independently verifiable unit of work.\n" + "- One AGENT step = one spawn_agent call later. Do NOT bundle multiple concerns.\n" + "- Comma test: if your step description lists things with 'and' or commas, " + "each item is a separate step.\n" + "- Good: 'Research X pricing from 3 sources' | 'Audit SSH config on host Y'\n" + "- Bad: 'Research everything and write the full report' (too broad — split it)\n\n" + "Required output format (use exactly this structure):\n\n" + "## Plan\n\n" + "**Task:** [reformulated task]\n" + "**Goal:** [success criterion]\n\n" + "**Milestones:**\n" + "A. [strategic phase]\n" + "B. [strategic phase]\n" + "C. [strategic phase]\n\n" + "**Steps:**\n" + "1. [description] → TOOL: tool_name\n" + "2. [description] → AGENT: profile_id\n" + "3. [description] → AGENT: profile_id\n" + "4. [description] → SELF\n" + "... continue to the needed depth, up to 15 steps\n\n" + "**Parallel:** [step numbers that can run simultaneously, or NONE]\n" + "**Risks:** [unknowns to watch for, or NONE]\n\n" + "Reject vague steps such as 'research and implement everything', 'fix all issues', " + "or 'analyze project and make changes'. Split them into concrete, verifiable units. " + "Do not write prose. Do not start executing. Plan only." + ), + ) + phase3_ctx: list[Message] = [phase3_system] + if mem: + phase3_ctx.append(mem) + user_msgs = [m for m in context if m.role == "user"] + if user_msgs: + phase3_ctx.append(user_msgs[-1]) + + try: + r2 = await asyncio.wait_for( + llm.complete(phase3_ctx, tools=None, temperature=0.3, model=profile.model, think=False), + timeout=settings.llm_complete_timeout, + ) + plan_text = (r2.content or "").strip() + except asyncio.TimeoutError: + log.warning("agent.planning_phase3_timeout", timeout=settings.llm_complete_timeout) + _dbg["result"] = "phase3_timeout" + if not is_subagent: + yield PlanningDebugData(log=_dbg) + return + except Exception: + log.warning("agent.planning_phase3_failed", exc_info=True) + _dbg["result"] = "phase3_error" + if not is_subagent: + yield PlanningDebugData(log=_dbg) + return + + if r2.prompt_tokens or r2.completion_tokens: + yield AIHelperTokensUsed( + prompt_tokens=r2.prompt_tokens or 0, + completion_tokens=r2.completion_tokens or 0, + ) + + _dbg["phases"]["3"] = { + "output": plan_text, + "prompt_tokens": r2.prompt_tokens or 0, + "completion_tokens": r2.completion_tokens or 0, + } + + if not plan_text: + _dbg["result"] = "empty_plan" + if not is_subagent: + yield PlanningDebugData(log=_dbg) + return + + if not re.search(r"^\s*\d+[\.\)]", plan_text, re.MULTILINE): + log.warning("agent.planning_no_numbered_steps", plan_preview=plan_text[:200]) + + if _stop and _stop.is_set(): + log.debug("agent.planning_stopped", phase=3) + return + + if not re.search(r"(TOOL:|AGENT:|→\s*SELF)", plan_text): + log.warning("agent.planning_no_executors", hint="plan lacks TOOL/AGENT/SELF assignments") + + context.append(Message(role="assistant", content=plan_text)) + if messages is not None: + messages.append(Message(role="assistant", content=plan_text, is_plan=True)) + + context.append(Message( + role="system", + content="Plan is ready. Execute it now step by step, starting with step 1. Use the todo tool to track progress.", + )) + + _todo_steps = _parse_plan_steps(plan_text) + if _todo_steps: + try: + from navi.tools.todo import set_tasks + from navi.tools.base import current_session_id as _sid_var + _sid = _sid_var.get() or "__default__" + set_tasks(_sid, _todo_steps) + log.debug("agent.todo_auto_populated", steps=len(_todo_steps), session=_sid) + except Exception: + log.warning("agent.todo_auto_populate_failed", exc_info=True) + + log.debug("agent.plan_ready", phases=3 if advisor_feedback else 2, length=len(plan_text)) + if not is_subagent: + yield PlanningDebugData(log=_dbg) + yield PlanReady(plan=plan_text, is_subagent=is_subagent) diff --git a/navi/core/tool_executor.py b/navi/core/tool_executor.py new file mode 100644 index 0000000..fe4d289 --- /dev/null +++ b/navi/core/tool_executor.py @@ -0,0 +1,150 @@ +"""Tool execution helpers — extracted from agent.py.""" + +import asyncio +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +import structlog + +from navi.llm.base import Message, ToolCallRequest +from navi.tools.base import Tool + +if TYPE_CHECKING: + from navi.core.events import ToolEvent + +log = structlog.get_logger() + + +class ToolExecutor: + """Runs tool calls and builds ToolEvent / Message results.""" + + def __init__(self, tool_registry) -> None: + self._tools = tool_registry + + async def _run_single_tool( + self, + tc: ToolCallRequest, + tool_map: dict[str, Tool], + ) -> tuple["ToolEvent", Message, "Message | None"]: + """Execute one tool call and return (ToolEvent, tool_msg, optional_image_msg). + + Called via asyncio.create_task() from run_stream() so that the parent + generator can drain the event sink queue concurrently. + """ + from navi.core.events import ToolEvent + + tool = tool_map.get(tc.name) + image_msg = None + metadata: dict = {} + if tool is None: + content = f"Error: tool '{tc.name}' not found." + event = ToolEvent(tool_name=tc.name, arguments=tc.arguments, + result=content, success=False) + else: + log.info("tool.execute", tool=tc.name, args=tc.arguments) + middlewares = getattr(self._tools, "_middlewares", []) + for mw in middlewares: + await mw.before_execute(tc.name, tc.arguments) + result = await tool.execute(tc.arguments) + for mw in middlewares: + await mw.after_execute(tc.name, tc.arguments, result) + content = result.to_message_content() + metadata = result.metadata or {} + event = ToolEvent(tool_name=tc.name, arguments=tc.arguments, + result=content, success=result.success, + metadata=metadata) + if result.success and result.metadata and result.metadata.get("is_image"): + b64 = result.metadata.get("base64") + if b64: + image_msg = Message( + role="user", + content=f"[Image loaded via {tc.name} — analyse it]", + images=[b64], + ) + msg = Message(role="tool", content=content, tool_call_id=tc.id, + name=tc.name, metadata=metadata) + return event, msg, image_msg + + async def _execute_tool_calls( + self, tool_calls: list[ToolCallRequest], tools: list[Tool] + ) -> tuple[list[Message], list[Message]]: + tool_map = {t.name: t for t in tools} + + async def _run_one(tc: ToolCallRequest) -> tuple[Message, Message | None]: + tool = tool_map.get(tc.name) + image_msg = None + metadata: dict = {} + if tool is None: + content = f"Error: tool '{tc.name}' not found." + else: + log.info("tool.execute", tool=tc.name, args=tc.arguments) + middlewares = getattr(self._tools, "_middlewares", []) + for mw in middlewares: + await mw.before_execute(tc.name, tc.arguments) + result = await tool.execute(tc.arguments) + for mw in middlewares: + await mw.after_execute(tc.name, tc.arguments, result) + content = result.to_message_content() + metadata = result.metadata or {} + if result.success and result.metadata and result.metadata.get("is_image"): + b64 = result.metadata.get("base64") + if b64: + image_msg = Message( + role="user", + content=f"[Image loaded via {tc.name} — analyse it]", + images=[b64], + ) + tool_msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name, metadata=metadata) + return tool_msg, image_msg + + pairs = await asyncio.gather(*[_run_one(tc) for tc in tool_calls]) + tool_msgs = [p[0] for p in pairs] + image_msgs = [p[1] for p in pairs if p[1] is not None] + return tool_msgs, image_msgs + + async def _execute_tool_calls_streaming( + self, tool_calls: list[ToolCallRequest], tools: list[Tool] + ) -> tuple[list[tuple["ToolEvent", Message]], list[Message]]: + from navi.core.events import ToolEvent + + tool_map = {t.name: t for t in tools} + middlewares = getattr(self._tools, "_middlewares", []) + + async def _run_one(tc: ToolCallRequest) -> tuple[ToolEvent, Message, Message | None]: + tool = tool_map.get(tc.name) + image_msg = None + metadata: dict = {} + if tool is None: + content = f"Error: tool '{tc.name}' not found." + event = ToolEvent( + tool_name=tc.name, arguments=tc.arguments, result=content, success=False + ) + else: + log.info("tool.execute", tool=tc.name, args=tc.arguments) + for mw in middlewares: + await mw.before_execute(tc.name, tc.arguments) + result = await tool.execute(tc.arguments) + for mw in middlewares: + await mw.after_execute(tc.name, tc.arguments, result) + content = result.to_message_content() + metadata = result.metadata or {} + event = ToolEvent( + tool_name=tc.name, + arguments=tc.arguments, + result=content, + success=result.success, + metadata=metadata, + ) + if result.success and result.metadata and result.metadata.get("is_image"): + b64 = result.metadata.get("base64") + if b64: + image_msg = Message( + role="user", + content=f"[Image loaded via {tc.name} — analyse it]", + images=[b64], + ) + msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name, metadata=metadata) + return event, msg, image_msg + + triples = await asyncio.gather(*[_run_one(tc) for tc in tool_calls]) + return [(t[0], t[1]) for t in triples], [t[2] for t in triples if t[2] is not None]