Full protocol reference for the streaming agent interface. File: navi/api/websocket.py.
ws://host/ws/sessions/{session_id}
The session must exist before connecting (create via POST /sessions). If the session is not found, the WebSocket closes with code 4004.
Browser clients: cookies are sent automatically during the WebSocket handshake. The server resolves the user from the navi_auth_session cookie.
Headless clients: append the API token as a query parameter:
ws://host/ws/sessions/{session_id}?api_token=nav_aB3xYz9W...
No-auth mode (NAVI_AUTH_ENABLED=false): neither cookies nor tokens are required. Every WebSocket connection is treated as the local anonymous admin user.
Security note:
?api_tokenis visible in server access logs. For log-sensitive deployments, future versions may support sending{type: "auth", api_token: "..."}as the first WebSocket message after connect instead.
On connect the server immediately sends either session_sync (no active run) or begins the reconnect flow (active run detected).
{
"type": "message",
"content": "user text",
"images": ["base64string", ...],
"files": [{"name": "file.pdf", "path": "/abs/path"}]
}
type must be "message". Other types return an error frame.content is required and must be non-empty.images: optional list of base64-encoded images (data URIs accepted; the data:...;base64, prefix is stripped server-side). Limits: max 10 images per message, 5 MB each (decoded). Excess images are rejected with a WebSocket error.files: optional list of uploaded file references (appended to content as [Uploaded files on disk: ...]).Only one agent run may be active per session at a time. If a second message arrives while a run is already in progress (either a WebSocket run or a headless recall), the server rejects it with a WebSocket error. The client should wait for stream_end before sending the next message.
All frames are JSON objects with a type field.
| Frame | When |
|---|---|
{"type": "stream_start"} |
Before any agent output begins |
{"type": "stream_end", "content": "...", "context_tokens": N, "max_context_tokens": N, "elapsed_seconds": N, "tool_call_count": N, "token_count": N, "message_index": N} |
After final text, before workers |
{"type": "stream_stopped"} |
If the user stopped generation |
{"type": "error", "message": "..."} |
On any unhandled error |
| Frame | When |
|---|---|
{"type": "thinking_delta", "delta": "..."} |
Reasoning chunk during streaming |
{"type": "thinking_end"} |
Reasoning phase complete |
{"type": "turn_thinking", "thinking": "...", "is_subagent": bool} |
Full reasoning block from a tool-calling turn (complete(), non-streaming) |
Thinking blocks are collapsible in the UI: open during reasoning, auto-collapsed on thinking_end.
| Frame | When | ||
|---|---|---|---|
| `{"type": "planning_status", "phase": 1 | 2 | 3, "label": "...", "is_subagent": bool}` | During planning phase — progress label for UI. phase: 1=analysis, 2=reflect, 3=plan |
{"type": "plan_ready", "plan": "...", "is_subagent": bool} |
Before tool-calling loop if planning_enabled and a plan was generated |
planning_status frames arrive during each planning phase (analysis → optional reflect → plan). is_subagent: true means the planning is running inside a subagent — route it into the spawn_agent card, never into the top-level UI.
plan_ready carries the formatted step list. Rendered as a collapsible plan card in the UI.
| Frame | When |
|---|---|
{"type": "tool_started", "tool": "name", "args": {...}, "is_subagent": bool} |
Immediately when a tool call begins (before execution) |
{"type": "tool_call", "tool": "name", "args": {...}, "result": "...", "success": bool, "is_subagent": bool, "metadata": {...}} |
When the tool finishes |
is_subagent: true indicates the tool call was made by a nested subagent, not the top-level agent.
| Frame | When |
|---|---|
{"type": "stream_delta", "delta": "..."} |
Text chunk of the final response |
| Frame | When | |
|---|---|---|
{"type": "compression_started", "context_tokens": N, "max_context_tokens": N} |
Immediately before context compression begins (UI can show a spinner) | |
{"type": "context_compressed", "messages_before": N, "messages_after": N, "summary": "...", "context_tokens": N, "max_context_tokens": N} |
After context compression runs | |
{"type": "profile_switched", "profile_id": "...", "profile_name": "..."} |
When switch_profile tool succeeds |
|
{"type": "recall_update", "session_id": "...", "recall_id": "...", "call_type": "...", "trigger_at": "...", "status": "...", "action": "..."} |
Recall state changed (scheduled, cancelled, fired, rescheduled) | |
| `{"type": "mcp_status_update", "server_name": "...", "status": "connected | disconnected", "tool_count": N, "error": "..."}` | MCP server connection status changed — broadcast to all sessions for toast notifications |
{"type": "heartbeat"} |
Periodic keepalive during long silent operations (every 20 s) | |
{"type": "session_sync"} |
Client should reload session history from REST (GET /sessions/{id}) |
session_sync is sent in three situations:
POST /sessions/{session_id}/stop
Sets _AgentRun.stop_event. The agent checks this event:
aclose() on the generator)The client sends this via fetch(), not over the WebSocket, to avoid corrupting the WebSocket receive state.
Response: {"ok": true} if a run was active, {"ok": false, "reason": "no active run"} otherwise.
If the client reconnects to an in-progress run (e.g. page reload mid-stream), websocket_session() detects the existing _AgentRun in _runs and replays the full event buffer before routing live events:
← stream_start
← replay_start {"type": "replay_start", "count": N}
← ev_0 ... ev_N (all buffered events replayed verbatim)
← replay_end {"type": "replay_end"}
← (live events continue from here)
...
← session_sync (after stream finishes — sync final saved state)
The client should suppress cursor animations and other in-progress effects while replay_start..replay_end is in flight.
If the client reconnects after the run has already finished, there is no active _AgentRun, so it receives only session_sync and must fetch history via REST.
_runs: dict[str, _AgentRun] — global dict of active runs, keyed by session ID.
_AgentRun holds:
task: asyncio.Task — the running agent taskstop_event: asyncio.Event — cooperative stop signalsubscribers: list[Queue] — one queue per connected WebSocket clientevents: list[dict] — replay buffer; every serialised event dict emitted this turnEvents are broadcast to all subscribers and appended to events. When the run finishes, _runs.pop(session_id) is called from the finally block. The subscribe-then-note-count ordering guarantees no events are missed between the two steps (single-threaded async Python).