diff --git a/client/js/app.js b/client/js/app.js index 72c506b..f418529 100644 --- a/client/js/app.js +++ b/client/js/app.js @@ -1,7 +1,9 @@ import { api } from './api.js'; import { WsClient } from './ws.js'; import { appendMessage, appendStreamBubble, finalizeStreamBubble, - appendToolCall, appendThinkingCard, finalizeThinkingCard, + appendToolCall, appendPendingToolCard, finalizeToolCard, + appendSubagentStep, finalizeSubagentStep, + appendThinkingCard, finalizeThinkingCard, appendTypingIndicator, removeTypingIndicator, appendError, showEmptyState, scrollToBottom, appendSummaryCard, appendCompressionNotice } from './chat.js'; @@ -30,6 +32,8 @@ let currentBubble = null; let currentThinking = null; let pendingImages = []; +let pendingToolCard = null; // current main-level tool card awaiting result +let pendingSubStep = null; // current sub-agent step inside pendingToolCard const ws = new WsClient(); @@ -245,8 +249,28 @@ scrollToBottom(messagesEl); break; + case 'tool_started': + removeTypingIndicator(messagesEl); + if (event.is_subagent) { + // Sub-agent step — attach to current spawn_agent card + pendingSubStep = appendSubagentStep(pendingToolCard, event); + } else { + // Main-level tool — create new pending card + pendingToolCard = appendPendingToolCard(messagesEl, event); + } + scrollToBottom(messagesEl); + break; + case 'tool_call': - appendToolCall(messagesEl, event); + if (event.is_subagent) { + // Complete the current sub-agent step + finalizeSubagentStep(pendingSubStep, event); + pendingSubStep = null; + } else { + // Complete the main-level tool card + finalizeToolCard(pendingToolCard, event); + pendingToolCard = null; + } scrollToBottom(messagesEl); break; @@ -294,6 +318,8 @@ streaming = false; currentBubble = null; currentThinking = null; + pendingToolCard = null; + pendingSubStep = null; } // ── Sending ─────────────────────────────────────────────────────────────────── diff --git a/client/js/chat.js b/client/js/chat.js index 1675c74..c54885c 100644 --- a/client/js/chat.js +++ b/client/js/chat.js @@ -22,13 +22,21 @@ // ── Tool icons ──────────────────────────────────────────────────────────────── const TOOL_ICONS = { - web_search: '🔍', - filesystem: '📁', - http_request: '🌐', - code_exec: '⚙️', - terminal: '💻', - ssh_exec: '🖧', - image_view: '🖼️', + web_search: '🔍', + web_view: '🌍', + filesystem: '📁', + http_request: '🌐', + code_exec: '⚙️', + terminal: '💻', + ssh_exec: '🖧', + image_view: '🖼️', + spawn_agent: '🤖', + memory_search: '🧠', + memory_forget: '🗑️', + write_tool: '✏️', + reload_tools: '🔄', + list_tools: '📋', + tool_manual: '📖', }; // ── Public API ──────────────────────────────────────────────────────────────── @@ -105,17 +113,116 @@ bubble.appendChild(renderMarkdown(content)); } +/** Build an args grid element from a {key:val} object. Returns null if empty. */ +function buildArgsEl(args) { + const entries = Object.entries(args ?? {}); + if (!entries.length) return null; + const div = document.createElement('div'); + div.className = 'tool-args'; + div.innerHTML = entries + .map(([k, v]) => `${esc(k)}${esc(JSON.stringify(v))}`) + .join(''); + return div; +} + /** - * Tool call card — collapsed by default, click header to toggle. + * Create a pending tool card (spinner, no result yet). + * Returns the card element — pass to finalizeToolCard() when done. + */ +export function appendPendingToolCard(el, event) { + const icon = TOOL_ICONS[event.tool] ?? '🔧'; + const card = document.createElement('div'); + card.className = 'tool-card pending'; + + const header = document.createElement('div'); + header.className = 'tool-header'; + header.innerHTML = ` + ${icon} + ${esc(event.tool)} + `; + + // For spawn_agent: open body immediately to show sub-agent log as it streams + const body = document.createElement('div'); + body.className = event.tool === 'spawn_agent' ? 'tool-body tool-body-open' : 'tool-body'; + + const argsEl = buildArgsEl(event.args); + if (argsEl) body.appendChild(argsEl); + + if (event.tool === 'spawn_agent') { + const log = document.createElement('div'); + log.className = 'subagent-log'; + body.appendChild(log); + card._subagentLog = log; + } + + card.append(header, body); + el.appendChild(card); + return card; +} + +/** + * Fill in a pending card with the completed result. + */ +export function finalizeToolCard(card, event) { + const success = event.success; + card.classList.remove('pending'); + if (!success) card.classList.add('error'); + + const statusEl = card.querySelector('.tool-status'); + if (statusEl) statusEl.innerHTML = success ? '✓' : '✗'; + + const body = card.querySelector('.tool-body'); + if (body) { + body.classList.remove('tool-body-open'); + // Strip the "[Sub-agent result — ...]" reminder prefix before showing to user + const result = event.result.replace(/^\[Sub-agent result[^\]]*\]\n\n/, ''); + const pre = document.createElement('pre'); + pre.className = 'tool-result-pre'; + pre.textContent = result; + body.appendChild(pre); + } + + const header = card.querySelector('.tool-header'); + if (header) header.addEventListener('click', () => card.classList.toggle('open')); +} + +/** + * Append a pending sub-agent step inside a spawn_agent card. + * Returns the step element — pass to finalizeSubagentStep() when done. + */ +export function appendSubagentStep(card, event) { + const log = card._subagentLog; + if (!log) return null; + const icon = TOOL_ICONS[event.tool] ?? '🔧'; + const step = document.createElement('div'); + step.className = 'subagent-step pending'; + step.innerHTML = ` + + ${icon} + ${esc(event.tool)} + `; + log.appendChild(step); + return step; +} + +/** + * Mark a sub-agent step as complete. + */ +export function finalizeSubagentStep(step, event) { + if (!step) return; + step.classList.remove('pending'); + if (!event.success) step.classList.add('error'); + const statusEl = step.querySelector('.step-status'); + if (statusEl) statusEl.innerHTML = event.success ? '✓' : '✗'; +} + +/** + * Tool call card from history — complete, collapsed by default. */ export function appendToolCall(el, event) { const icon = TOOL_ICONS[event.tool] ?? '🔧'; const success = event.success; - const argsLines = Object.entries(event.args ?? {}) - .map(([k, v]) => `${esc(k)}${esc(JSON.stringify(v))}`) - .join(''); - const card = document.createElement('div'); card.className = `tool-card${success ? '' : ' error'}`; @@ -128,19 +235,14 @@ const body = document.createElement('div'); body.className = 'tool-body'; - if (argsLines) { - const argsDiv = document.createElement('div'); - argsDiv.className = 'tool-args'; - argsDiv.innerHTML = argsLines; - body.appendChild(argsDiv); - } + const argsEl = buildArgsEl(event.args); + if (argsEl) body.appendChild(argsEl); const pre = document.createElement('pre'); pre.className = 'tool-result-pre'; pre.textContent = event.result; body.appendChild(pre); header.addEventListener('click', () => card.classList.toggle('open')); - card.append(header, body); el.appendChild(card); } diff --git a/client/style.css b/client/style.css index 643d893..e05c480 100644 --- a/client/style.css +++ b/client/style.css @@ -169,6 +169,19 @@ flex-shrink: 0; } +/* Small inline spinner — used inside tool card headers */ +.spinner-inline { + display: inline-block; + width: 11px; + height: 11px; + border: 2px solid rgba(255,255,255,0.2); + border-top-color: var(--tool-text); + border-radius: 50%; + animation: spin 0.7s linear infinite; + vertical-align: middle; +} +.tool-card.error .spinner-inline { border-top-color: var(--error-text); } + /* Centred in the session list column */ .sidebar-spinner { display: flex; @@ -185,6 +198,36 @@ } .chat-spinner .spinner { width: 28px; height: 28px; border-width: 3px; } +/* ── Sub-agent step log (inside spawn_agent card) ────── */ + +.subagent-log { + display: flex; + flex-direction: column; + gap: 3px; + border-top: 1px solid var(--tool-border); + padding-top: 6px; + margin-top: 2px; +} + +.subagent-step { + display: flex; + align-items: center; + gap: 5px; + font-size: 11px; + color: var(--tool-text); + opacity: 0.85; + padding: 2px 4px; + border-radius: 4px; +} +.subagent-step.pending { opacity: 1; } +.subagent-step.error { color: var(--error-text); } +.subagent-step.done { opacity: 0.6; } + +.step-arrow { color: var(--text-muted); font-size: 10px; } +.step-icon { font-size: 12px; } +.step-name { flex: 1; } +.step-status { font-size: 11px; } + /* ── Main chat area ──────────────────────────────────── */ .chat { @@ -337,10 +380,13 @@ flex-direction: column; gap: 6px; } -.tool-card.open .tool-body { +.tool-card.open .tool-body, +.tool-card.pending .tool-body-open { display: flex; animation: fadeSlide 0.18s ease; } +/* pending: no chevron toggle while running */ +.tool-card.pending .tool-header::after { content: ''; } @keyframes fadeSlide { from { opacity: 0; transform: translateY(-4px); } to { opacity: 1; transform: translateY(0); } } .tool-args { diff --git a/navi/api/websocket.py b/navi/api/websocket.py index 1a36de9..6494c7a 100644 --- a/navi/api/websocket.py +++ b/navi/api/websocket.py @@ -21,6 +21,7 @@ from navi.api.deps import get_agent, get_session_store from navi.core import Agent, ContextCompressed, InMemorySessionStore, StreamEnd, TextDelta, ThinkingDelta, ThinkingEnd, ToolEvent +from navi.core.events import ToolStarted from navi.exceptions import MaxIterationsReached, NaviError, SessionNotFound router = APIRouter(tags=["websocket"]) @@ -81,6 +82,13 @@ await websocket.send_json({"type": "thinking_end"}) elif isinstance(event, TextDelta): await websocket.send_json({"type": "stream_delta", "delta": event.delta}) + elif isinstance(event, ToolStarted): + await websocket.send_json({ + "type": "tool_started", + "tool": event.tool_name, + "args": event.arguments, + "is_subagent": event.is_subagent, + }) elif isinstance(event, ToolEvent): await websocket.send_json({ "type": "tool_call", @@ -88,6 +96,7 @@ "args": event.arguments, "result": event.result, "success": event.success, + "is_subagent": event.is_subagent, }) elif isinstance(event, StreamEnd): await websocket.send_json({ diff --git a/navi/core/agent.py b/navi/core/agent.py index bc09aad..21da98b 100644 --- a/navi/core/agent.py +++ b/navi/core/agent.py @@ -26,7 +26,7 @@ from navi.config import settings from navi.exceptions import MaxIterationsReached, SessionNotFound from navi.llm.base import LLMBackend, Message, ToolCallRequest -from navi.tools.base import Tool +from navi.tools.base import Tool, current_event_sink from .events import ( AgentEvent, @@ -35,6 +35,7 @@ ThinkingDelta, ThinkingEnd, ToolEvent, + ToolStarted, ) from .registry import BackendRegistry, ProfileRegistry, ToolRegistry from .session import SessionStore @@ -173,8 +174,14 @@ context.append(Message(role="user", content=user_message, created_at=datetime.now(timezone.utc))) + # Read the event sink set by the parent run_stream() for this tool call. + # If None (e.g. called from run(), not run_stream()), events are silently dropped. + sink = current_event_sink.get() + log.info("agent.subagent.start", profile_id=profile_id, max_iterations=max_iterations) + tool_map = {t.name: t for t in tools} + for iteration in range(max_iterations): log.debug("agent.subagent.iteration", iteration=iteration) response = await llm.complete( @@ -190,18 +197,48 @@ result_len=len(content)) return content - assistant_msg = Message( + context.append(Message( role="assistant", content=response.content, tool_calls=response.tool_calls, - ) - context.append(assistant_msg) + )) - tool_results, image_injections = await self._execute_tool_calls( - response.tool_calls, tools - ) - context.extend(tool_results) - context.extend(image_injections) + # Execute each tool call sequentially, emitting events to parent sink + for tc in response.tool_calls: + if sink is not None: + await sink.put(ToolStarted( + tool_name=tc.name, arguments=tc.arguments, is_subagent=True + )) + + tool = tool_map.get(tc.name) + image_msg = None + if tool is None: + content = f"Error: tool '{tc.name}' not found." + success = False + else: + log.info("tool.execute.subagent", tool=tc.name, args=tc.arguments) + result = await tool.execute(tc.arguments) + content = result.to_message_content() + success = result.success + if result.success and result.metadata and result.metadata.get("is_image"): + b64 = result.metadata.get("base64") + if b64: + image_msg = Message( + role="user", + content=f"[Image loaded via {tc.name} — analyse it]", + images=[b64], + ) + + if sink is not None: + await sink.put(ToolEvent( + tool_name=tc.name, arguments=tc.arguments, + result=content, success=success, is_subagent=True, + )) + + context.append(Message(role="tool", content=content, + tool_call_id=tc.id, name=tc.name)) + if image_msg: + context.append(image_msg) log.warning("agent.subagent.max_iterations", max_iterations=max_iterations) return "[Sub-agent reached iteration limit without a final answer]" @@ -297,7 +334,7 @@ yield event return - # Tool calls: emit events, append to both messages and context + # Tool calls — sequential: announce → execute (with sink draining) → record assistant_msg = Message( role="assistant", content=response.content, @@ -306,15 +343,38 @@ session.messages.append(assistant_msg) session.context.append(assistant_msg) - tool_results_msgs, image_injections = await self._execute_tool_calls_streaming( - response.tool_calls, tools - ) - for event, msg in tool_results_msgs: - yield event + tool_map = {t.name: t for t in tools} + for tc in response.tool_calls: + # 1. Announce immediately so the UI shows a pending card + removeTypingIndicator = True + yield ToolStarted(tool_name=tc.name, arguments=tc.arguments) + + # 2. Create a sink queue for sub-agent events from this tool call. + # create_task() copies the current contextvars context, so the + # task will inherit current_event_sink = sink. + sink: asyncio.Queue = asyncio.Queue() + sink_token = current_event_sink.set(sink) + task = asyncio.create_task(self._run_single_tool(tc, tool_map)) + current_event_sink.reset(sink_token) # outer ctx restored; task has its own copy + + # 3. Drain sub-agent events while the tool executes + while not task.done(): + try: + yield sink.get_nowait() + except asyncio.QueueEmpty: + await asyncio.sleep(0.02) + + # 4. Flush any remaining events queued after task.done() was detected + while not sink.empty(): + yield sink.get_nowait() + + # 5. Yield the completed ToolEvent and record in session + tool_event, msg, image_msg = task.result() + yield tool_event session.messages.append(msg) session.context.append(msg) - # Image injections are synthetic — context only - session.context.extend(image_injections) + if image_msg: + session.context.append(image_msg) await self._sessions.save(session) raise MaxIterationsReached(profile.max_iterations) @@ -392,6 +452,39 @@ def _get_backend(self, backend_key: str) -> LLMBackend: return self._backends.get(backend_key) + async def _run_single_tool( + self, + tc: ToolCallRequest, + tool_map: dict[str, Tool], + ) -> tuple[ToolEvent, Message, "Message | None"]: + """Execute one tool call and return (ToolEvent, tool_msg, optional_image_msg). + + Called via asyncio.create_task() from run_stream() so that the parent + generator can drain the event sink queue concurrently. + """ + tool = tool_map.get(tc.name) + image_msg = None + if tool is None: + content = f"Error: tool '{tc.name}' not found." + event = ToolEvent(tool_name=tc.name, arguments=tc.arguments, + result=content, success=False) + else: + log.info("tool.execute", tool=tc.name, args=tc.arguments) + result = await tool.execute(tc.arguments) + content = result.to_message_content() + event = ToolEvent(tool_name=tc.name, arguments=tc.arguments, + result=content, success=result.success) + if result.success and result.metadata and result.metadata.get("is_image"): + b64 = result.metadata.get("base64") + if b64: + image_msg = Message( + role="user", + content=f"[Image loaded via {tc.name} — analyse it]", + images=[b64], + ) + msg = Message(role="tool", content=content, tool_call_id=tc.id, name=tc.name) + return event, msg, image_msg + async def _execute_tool_calls( self, tool_calls: list[ToolCallRequest], tools: list[Tool] ) -> tuple[list[Message], list[Message]]: diff --git a/navi/core/events.py b/navi/core/events.py index 96d4efc..77993a5 100644 --- a/navi/core/events.py +++ b/navi/core/events.py @@ -1,16 +1,26 @@ """Agent event dataclasses — emitted during run_stream() and forwarded to WebSocket clients.""" -from dataclasses import dataclass +from dataclasses import dataclass, field + + +@dataclass +class ToolStarted: + """Emitted immediately when a tool call begins, before execution completes.""" + + tool_name: str + arguments: dict + is_subagent: bool = False # True when emitted from inside run_ephemeral @dataclass class ToolEvent: - """Emitted during streaming to inform the client about tool activity.""" + """Emitted when a tool call finishes — carries the result.""" tool_name: str arguments: dict result: str success: bool + is_subagent: bool = False # True when emitted from inside run_ephemeral @dataclass @@ -49,4 +59,4 @@ messages_after: int -AgentEvent = ToolEvent | TextDelta | ThinkingDelta | ThinkingEnd | StreamEnd | ContextCompressed +AgentEvent = ToolStarted | ToolEvent | TextDelta | ThinkingDelta | ThinkingEnd | StreamEnd | ContextCompressed diff --git a/navi/tools/base.py b/navi/tools/base.py index af9ce84..e2370d5 100644 --- a/navi/tools/base.py +++ b/navi/tools/base.py @@ -8,6 +8,7 @@ Tools that need per-session state (e.g. SSH connection pool) read it here. """ +import asyncio from abc import ABC, abstractmethod from contextvars import ContextVar from dataclasses import dataclass, field @@ -17,6 +18,10 @@ # Set by Agent before every tool call. Tools that need per-session state read this. current_session_id: ContextVar[str | None] = ContextVar("current_session_id", default=None) +# Set by run_stream() before executing a tool. run_ephemeral() reads this to forward +# sub-agent tool events up to the parent WS stream. +current_event_sink: ContextVar[asyncio.Queue | None] = ContextVar("current_event_sink", default=None) + @dataclass class ToolResult: