"""Build LLM context lists for agent turns.
Extracted from agent.py to reduce the Agent class surface area.
"""
from datetime import datetime, timezone
from typing import TYPE_CHECKING
import structlog
from navi.llm.base import Message
import navi.config as _config
log = structlog.get_logger()
if TYPE_CHECKING:
from navi.context_providers._loader import ContextProviderRegistry
from navi.memory.store import MemoryStore
from navi.profiles.base import AgentProfile
async def render_todo_lines(session_id: str) -> list[str]:
"""Return a list of formatted todo lines for goal anchoring."""
try:
from navi.tools.todo import render_todo_lines as _rtl
return await _rtl(session_id)
except Exception:
return []
class ContextBuilder:
"""Handles construction of the full LLM context for each turn."""
def __init__(
self,
profile_registry,
memory_store: "MemoryStore | None" = None,
cp_registry: "ContextProviderRegistry | None" = None,
mcp_manager=None,
) -> None:
self._profiles = profile_registry
self._memory = memory_store
self._cp_registry = cp_registry
self._mcp_manager = mcp_manager
self._system_prompt_cache: dict[str, str] = {}
def build_system_prompt(self, profile: "AgentProfile") -> str:
"""Build the system prompt string for a profile (cached per profile)."""
cached = self._system_prompt_cache.get(profile.id)
if cached is not None:
return cached
parts: list[str] = []
persona = _config.settings.navi_persona.strip()
if persona:
parts.append(persona)
parts.append(profile.system_prompt)
other = [p for p in self._profiles.all() if p.id != profile.id]
if other:
lines = [
"## Available profiles",
f"Current: **{profile.id}**",
]
for p in other:
desc = p.short_description or p.description
lines.append(f"· {p.id}: {desc}")
lines.append(
"→ Switch profiles on your own judgment — do not ask for permission. "
"When a task clearly fits another profile, call switch_profile immediately, "
"then inform the user which profile is now active and why. "
"Use list_profiles if you need details about a profile's capabilities."
)
parts.append("\n".join(lines))
result = "\n\n---\n\n".join(parts)
self._system_prompt_cache[profile.id] = result
return result
def invalidate_system_prompt_cache(self, profile_id: str | None = None) -> None:
if profile_id is None:
self._system_prompt_cache.clear()
else:
self._system_prompt_cache.pop(profile_id, None)
async def _memory_msg(self, user_id: str | None = None) -> "Message | None":
if self._memory is None:
return None
summary = await self._memory.get_summary(user_id=user_id)
if not summary:
return None
return Message(
role="system",
content=f"## What I remember about the user\n\n{summary}",
)
async def _memory_facts_msg(
self,
user_message: str,
user_id: str | None = None,
injected_ids: set[str] | None = None,
) -> "Message | None":
if self._memory is None:
log.info("memory_facts_msg.skip", reason="no_memory_store")
return None
msg = user_message.strip()
if len(msg) <= 20:
log.info("memory_facts_msg.skip", reason="too_short", length=len(msg))
return None
words = [w for w in msg.split() if any(c.isalpha() for c in w)]
if len(words) < 2:
log.info("memory_facts_msg.skip", reason="too_few_words", words=len(words))
return None
if len(msg) < 50:
limit = 1
elif len(msg) <= 150:
limit = 2
else:
limit = 3
log.info("memory_facts_msg.search", query=msg[:60], limit=limit, user_id=user_id)
facts = await self._memory.search_facts(msg, user_id=user_id, limit=limit)
log.info("memory_facts_msg.hits", count=len(facts) if facts else 0)
if not facts:
return None
if injected_ids is not None:
new_facts = [f for f in facts if f.get("id") not in injected_ids]
skipped = len(facts) - len(new_facts)
log.info("memory_facts_msg.dedup", new=len(new_facts), skipped=skipped)
if not new_facts:
return None
for f in new_facts:
injected_ids.add(f["id"])
else:
new_facts = facts
lines = ["Known facts about the user:"]
for f in new_facts:
conf = f.get("confidence", 70)
lines.append(f"• [{f['category']}] {f['key']} → {f['value']} (confidence: {conf}%)")
result = "\n".join(lines)
log.info("memory_facts_msg.injected", lines=len(new_facts), length=len(result))
return Message(role="system", content=result)
async def _collect_context_injections(self, profile: "AgentProfile") -> list[Message]:
if self._cp_registry is None:
return []
out: list[Message] = []
for provider in self._cp_registry.all():
if not provider.global_provider and provider.name not in profile.context_providers:
continue
try:
text = await provider.get_context()
if text:
out.append(Message(role="system", content=text))
except Exception:
pass
return out
async def _build_goal_anchor(self, session_id: str, user_message: str) -> Message:
lines = [
"[Goal anchor]",
f"Original request: {user_message}",
]
todo_lines = await render_todo_lines(session_id)
if todo_lines:
lines.append("Current todo:")
lines.extend(todo_lines)
lines.append("Stay on track — complete the remaining pending/in_progress steps.")
lines.append("Use 1-based todo indexes. Mark completed steps done only after verification, with validation.")
lines.append("Before final response, update todo for every completed step, including the final one.")
return Message(role="system", content="\n".join(lines))
def _security_policy_msg(self) -> Message | None:
"""Build a dynamic security policy system message based on user role."""
from navi.tools._internal.base import current_user_id as _uid_var, current_user_role as _role_var
user_id = _uid_var.get(None)
role = _role_var.get()
if role == "admin":
return Message(
role="system",
content=(
"[Security policy]\n"
"Role: admin\n"
"Full system access. No restrictions apply.\n"
"You may use any tool, access any path, and execute any command."
),
)
if user_id:
allowed = _config.settings.terminal_user_allowed_commands_list
return Message(
role="system",
content=(
"[Security policy]\n"
f"Role: user (user_id={user_id})\n"
f"Filesystem sandbox: user_data/{user_id}/\n"
"You MUST NOT attempt to access paths outside your sandbox.\n"
f"Terminal allowed commands: {', '.join(allowed)}\n"
"You MUST NOT use terminal for: curl, wget, ssh, scp, sudo, system-wide destructive operations, "
"or any command not in the allowlist.\n"
"If a task requires admin privileges (e.g. system-wide changes, accessing another user's files, "
"installing packages globally), tell the user to contact an admin.\n"
"Always prefer filesystem tool over terminal for file operations."
),
)
# Legacy / single-user mode — no policy injected
return None
def _user_context_msg(self) -> Message | None:
"""Build a [User context] system message from current_user_info ContextVar."""
from navi.tools._internal.base import current_user_info as _uinfo_var, current_user_role as _role_var
info = _uinfo_var.get(None)
if not info:
return None
lines = ["[User context]"]
if info.get("display_name"):
lines.append(f"Name: {info['display_name']}")
if info.get("username"):
lines.append(f"Username: {info['username']}")
if info.get("first_name") or info.get("last_name"):
parts = []
if info.get("first_name"):
parts.append(info["first_name"])
if info.get("last_name"):
parts.append(info["last_name"])
lines.append(f"Full name: {' '.join(parts)}")
if info.get("email"):
lines.append(f"Email: {info['email']}")
if info.get("phone"):
lines.append(f"Phone: {info['phone']}")
if info.get("birth_date"):
lines.append(f"Birth date: {info['birth_date']}")
if info.get("country") or info.get("city"):
parts = []
if info.get("city"):
parts.append(info["city"])
if info.get("country"):
parts.append(info["country"])
lines.append(f"Location: {', '.join(parts)}")
if info.get("locale"):
lines.append(f"Locale: {info['locale']}")
lines.append(f"Role: {_role_var.get()}")
return Message(role="system", content="\n".join(lines))
def _mcp_context_msg(self, profile: "AgentProfile | None" = None) -> "Message | None":
"""Build a system message with MCP server instructions.
Combines server-provided instructions (from MCP initialize handshake)
with overlay instructions from ``mcp_servers.d/*.json``.
"""
if not self._mcp_manager:
return None
if profile is not None and not profile.mcp_servers:
return None
server_names = set(profile.mcp_servers.keys()) if profile is not None else None
instructions = self._mcp_manager.get_instructions(server_names)
if not instructions:
return None
lines = ["[MCP servers — external knowledge sources]"]
for name, text in instructions.items():
lines.append(f"")
lines.append(f"## {name}")
lines.append(text)
return Message(role="system", content="\n".join(lines))
def build(
self,
session_context: list[Message],
profile: "AgentProfile",
mem: "Message | None",
iteration: int | None = None,
max_iterations: int | None = None,
extra_system: list[Message] | None = None,
session_id: str | None = None,
) -> list[Message]:
system_prompt = self.build_system_prompt(profile)
if session_id:
system_prompt += (
f"\n\n---\n\n"
f"[Session context]\n"
f"Session ID: {session_id}\n"
f"Session files directory: {_config.settings.session_files_dir}/{session_id}/\n"
f"When writing files the user should see, always use the session directory path above."
)
system_msg = Message(role="system", content=system_prompt)
conv = [m for m in session_context if m.role != "system"]
result: list[Message] = [system_msg]
if mem:
result.append(mem)
# Inject user profile context for multi-user mode
user_ctx = self._user_context_msg()
if user_ctx:
result.append(user_ctx)
# Inject security policy for multi-user mode
policy = self._security_policy_msg()
if policy:
result.append(policy)
# Inject MCP server instructions into context
mcp_msg = self._mcp_context_msg(profile)
if mcp_msg:
result.append(mcp_msg)
if extra_system:
result.extend(extra_system)
result.extend(conv)
if profile.iteration_budget_enabled and iteration is not None and max_iterations is not None:
remaining_after_this = max_iterations - iteration - 1
if remaining_after_this <= 2:
urgency = (
f" CRITICAL: only {remaining_after_this} iteration(s) left after this one — finish or produce "
"a partial result now, do not start new subtasks."
)
elif remaining_after_this <= 5:
urgency = (
" Low iteration budget: complete the current step, continue necessary "
"verification/publishing, and avoid starting unrelated subtasks."
)
else:
urgency = ""
result.append(Message(
role="system",
content=(
f"[Iteration {iteration + 1}/{max_iterations} — "
f"{remaining_after_this} iteration(s) after this one.{urgency}]"
),
))
return result