Newer
Older
navi-1 / navi / core / context_builder.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy on 12 May 13 KB Clarify knowledge persistence prompts
"""Build LLM context lists for agent turns.

Extracted from agent.py to reduce the Agent class surface area.
"""

from datetime import datetime, timezone
from typing import TYPE_CHECKING

import structlog

from navi.llm.base import Message

import navi.config as _config

log = structlog.get_logger()

if TYPE_CHECKING:
    from navi.context_providers._loader import ContextProviderRegistry
    from navi.memory.store import MemoryStore
    from navi.profiles.base import AgentProfile


def render_todo_lines(session_id: str) -> list[str]:
    """Return a list of formatted todo lines for goal anchoring."""
    try:
        from navi.tools.todo import render_todo_lines as _rtl
        return _rtl(session_id)
    except Exception:
        return []


class ContextBuilder:
    """Handles construction of the full LLM context for each turn."""

    def __init__(
        self,
        profile_registry,
        memory_store: "MemoryStore | None" = None,
        cp_registry: "ContextProviderRegistry | None" = None,
        mcp_manager=None,
    ) -> None:
        self._profiles = profile_registry
        self._memory = memory_store
        self._cp_registry = cp_registry
        self._mcp_manager = mcp_manager
        self._system_prompt_cache: dict[str, str] = {}

    def build_system_prompt(self, profile: "AgentProfile") -> str:
        """Build the system prompt string for a profile (cached per profile)."""
        cached = self._system_prompt_cache.get(profile.id)
        if cached is not None:
            return cached

        parts: list[str] = []
        persona = _config.settings.navi_persona.strip()
        if persona:
            parts.append(persona)
        parts.append(profile.system_prompt)

        other = [p for p in self._profiles.all() if p.id != profile.id]
        if other:
            lines = [
                "## Available profiles",
                f"Current: **{profile.id}**",
            ]
            for p in other:
                desc = p.short_description or p.description
                lines.append(f"· {p.id}: {desc}")
            lines.append(
                "→ Switch profiles on your own judgment — do not ask for permission. "
                "When a task clearly fits another profile, call switch_profile immediately, "
                "then inform the user which profile is now active and why. "
                "Use list_profiles if you need details about a profile's capabilities."
            )
            parts.append("\n".join(lines))

        result = "\n\n---\n\n".join(parts)
        self._system_prompt_cache[profile.id] = result
        return result

    def invalidate_system_prompt_cache(self, profile_id: str | None = None) -> None:
        if profile_id is None:
            self._system_prompt_cache.clear()
        else:
            self._system_prompt_cache.pop(profile_id, None)

    async def _memory_msg(self, user_id: str | None = None) -> "Message | None":
        if self._memory is None:
            return None
        summary = await self._memory.get_summary(user_id=user_id)
        if not summary:
            return None
        return Message(
            role="system",
            content=f"## What I remember about the user\n\n{summary}",
        )

    async def _memory_facts_msg(
        self,
        user_message: str,
        user_id: str | None = None,
        injected_ids: set[str] | None = None,
    ) -> "Message | None":
        if self._memory is None:
            log.info("memory_facts_msg.skip", reason="no_memory_store")
            return None
        msg = user_message.strip()
        if len(msg) <= 20:
            log.info("memory_facts_msg.skip", reason="too_short", length=len(msg))
            return None
        words = [w for w in msg.split() if any(c.isalpha() for c in w)]
        if len(words) < 2:
            log.info("memory_facts_msg.skip", reason="too_few_words", words=len(words))
            return None
        if len(msg) < 50:
            limit = 1
        elif len(msg) <= 150:
            limit = 2
        else:
            limit = 3
        log.info("memory_facts_msg.search", query=msg[:60], limit=limit, user_id=user_id)
        facts = await self._memory.search_facts(msg, user_id=user_id, limit=limit)
        log.info("memory_facts_msg.hits", count=len(facts) if facts else 0)
        if not facts:
            return None
        if injected_ids is not None:
            new_facts = [f for f in facts if f.get("id") not in injected_ids]
            skipped = len(facts) - len(new_facts)
            log.info("memory_facts_msg.dedup", new=len(new_facts), skipped=skipped)
            if not new_facts:
                return None
            for f in new_facts:
                injected_ids.add(f["id"])
        else:
            new_facts = facts
        lines = ["Known facts about the user:"]
        for f in new_facts:
            conf = f.get("confidence", 70)
            lines.append(f"• [{f['category']}] {f['key']} → {f['value']} (confidence: {conf}%)")
        result = "\n".join(lines)
        log.info("memory_facts_msg.injected", lines=len(new_facts), length=len(result))
        return Message(role="system", content=result)

    async def _collect_context_injections(self, profile: "AgentProfile") -> list[Message]:
        if self._cp_registry is None:
            return []
        out: list[Message] = []
        for provider in self._cp_registry.all():
            if not provider.global_provider and provider.name not in profile.context_providers:
                continue
            try:
                text = await provider.get_context()
                if text:
                    out.append(Message(role="system", content=text))
            except Exception:
                pass
        return out

    def _build_goal_anchor(self, session_id: str, user_message: str) -> Message:
        lines = [
            "[Goal anchor]",
            f"Original request: {user_message}",
        ]
        todo_lines = render_todo_lines(session_id)
        if todo_lines:
            lines.append("Current todo:")
            lines.extend(todo_lines)
        lines.append("Stay on track — complete the remaining pending/in_progress steps.")
        lines.append("Use 1-based todo indexes. Mark completed steps done only after verification, with validation.")
        lines.append("Before final response, update todo for every completed step, including the final one.")
        return Message(role="system", content="\n".join(lines))

    def _security_policy_msg(self) -> Message | None:
        """Build a dynamic security policy system message based on user role."""
        from navi.tools.base import current_user_id as _uid_var, current_user_role as _role_var
        user_id = _uid_var.get(None)
        role = _role_var.get()
        if role == "admin":
            return Message(
                role="system",
                content=(
                    "[Security policy]\n"
                    "Role: admin\n"
                    "Full system access. No restrictions apply.\n"
                    "You may use any tool, access any path, and execute any command."
                ),
            )
        if user_id:
            allowed = _config.settings.terminal_user_allowed_commands_list
            return Message(
                role="system",
                content=(
                    "[Security policy]\n"
                    f"Role: user (user_id={user_id})\n"
                    f"Filesystem sandbox: user_data/{user_id}/\n"
                    "You MUST NOT attempt to access paths outside your sandbox.\n"
                    f"Terminal allowed commands: {', '.join(allowed)}\n"
                    "You MUST NOT use terminal for: curl, wget, ssh, scp, sudo, system-wide destructive operations, "
                    "or any command not in the allowlist.\n"
                    "If a task requires admin privileges (e.g. system-wide changes, accessing another user's files, "
                    "installing packages globally), tell the user to contact an admin.\n"
                    "Always prefer filesystem tool over terminal for file operations."
                ),
            )
        # Legacy / single-user mode — no policy injected
        return None

    def _user_context_msg(self) -> Message | None:
        """Build a [User context] system message from current_user_info ContextVar."""
        from navi.tools.base import current_user_info as _uinfo_var, current_user_role as _role_var
        info = _uinfo_var.get(None)
        if not info:
            return None
        lines = ["[User context]"]
        if info.get("display_name"):
            lines.append(f"Name: {info['display_name']}")
        if info.get("username"):
            lines.append(f"Username: {info['username']}")
        if info.get("first_name") or info.get("last_name"):
            parts = []
            if info.get("first_name"):
                parts.append(info["first_name"])
            if info.get("last_name"):
                parts.append(info["last_name"])
            lines.append(f"Full name: {' '.join(parts)}")
        if info.get("email"):
            lines.append(f"Email: {info['email']}")
        if info.get("phone"):
            lines.append(f"Phone: {info['phone']}")
        if info.get("birth_date"):
            lines.append(f"Birth date: {info['birth_date']}")
        if info.get("country") or info.get("city"):
            parts = []
            if info.get("city"):
                parts.append(info["city"])
            if info.get("country"):
                parts.append(info["country"])
            lines.append(f"Location: {', '.join(parts)}")
        if info.get("locale"):
            lines.append(f"Locale: {info['locale']}")
        lines.append(f"Role: {_role_var.get()}")
        return Message(role="system", content="\n".join(lines))

    def _mcp_context_msg(self, profile: "AgentProfile | None" = None) -> "Message | None":
        """Build a system message with MCP server instructions.

        Combines server-provided instructions (from MCP initialize handshake)
        with overlay instructions from ``mcp_servers.json``.
        """
        if not self._mcp_manager:
            return None
        if profile is not None and not profile.mcp_servers:
            return None
        server_names = set(profile.mcp_servers.keys()) if profile is not None else None
        instructions = self._mcp_manager.get_instructions(server_names)
        if not instructions:
            return None
        lines = ["[MCP servers — external knowledge sources]"]
        for name, text in instructions.items():
            lines.append(f"")
            lines.append(f"## {name}")
            lines.append(text)
        return Message(role="system", content="\n".join(lines))

    def build(
        self,
        session_context: list[Message],
        profile: "AgentProfile",
        mem: "Message | None",
        iteration: int | None = None,
        max_iterations: int | None = None,
        extra_system: list[Message] | None = None,
        session_id: str | None = None,
    ) -> list[Message]:
        system_prompt = self.build_system_prompt(profile)
        if session_id:
            system_prompt += (
                f"\n\n---\n\n"
                f"[Session context]\n"
                f"Session ID: {session_id}\n"
                f"Session files directory: {_config.settings.session_files_dir}/{session_id}/\n"
                f"When writing files the user should see, always use the session directory path above."
            )
        system_msg = Message(role="system", content=system_prompt)
        conv = [m for m in session_context if m.role != "system"]
        result: list[Message] = [system_msg]
        if mem:
            result.append(mem)

        # Inject user profile context for multi-user mode
        user_ctx = self._user_context_msg()
        if user_ctx:
            result.append(user_ctx)

        # Inject security policy for multi-user mode
        policy = self._security_policy_msg()
        if policy:
            result.append(policy)

        # Inject MCP server instructions into context
        mcp_msg = self._mcp_context_msg(profile)
        if mcp_msg:
            result.append(mcp_msg)

        if extra_system:
            result.extend(extra_system)
        result.extend(conv)

        if profile.iteration_budget_enabled and iteration is not None and max_iterations is not None:
            remaining_after_this = max_iterations - iteration - 1
            if remaining_after_this <= 2:
                urgency = (
                    f" CRITICAL: only {remaining_after_this} iteration(s) left after this one — finish or produce "
                    "a partial result now, do not start new subtasks."
                )
            elif remaining_after_this <= 5:
                urgency = (
                    " Low iteration budget: complete the current step, continue necessary "
                    "verification/publishing, and avoid starting unrelated subtasks."
                )
            else:
                urgency = ""
            result.append(Message(
                role="system",
                content=(
                    f"[Iteration {iteration + 1}/{max_iterations} — "
                    f"{remaining_after_this} iteration(s) after this one.{urgency}]"
                ),
            ))

        return result