Full protocol reference for the streaming agent interface. File: navi/api/websocket.py.
ws://host/ws/sessions/{session_id}
The session must exist before connecting (create via POST /sessions). If the session is not found, the WebSocket closes with code 4004.
Browser clients: cookies are sent automatically during the WebSocket handshake. The server resolves the user from the navi_auth_session cookie.
Headless clients: append the API token as a query parameter:
ws://host/ws/sessions/{session_id}?api_token=nav_aB3xYz9W...
Security note:
?api_tokenis visible in server access logs. For log-sensitive deployments, future versions may support sending{type: "auth", api_token: "..."}as the first WebSocket message after connect instead.
On connect the server immediately sends either session_sync (no active run) or begins the reconnect flow (active run detected).
{
"type": "message",
"content": "user text",
"images": ["base64string", ...],
"files": [{"name": "file.pdf", "path": "/abs/path"}]
}
type must be "message". Other types return an error frame.content is required and must be non-empty.images: optional list of base64-encoded images (data URIs accepted; the data:...;base64, prefix is stripped server-side). Limits: max 10 images per message, 5 MB each (decoded). Excess images are rejected with a WebSocket error.files: optional list of uploaded file references (appended to content as [Uploaded files on disk: ...]).{
"type": "form_submit",
"form_id": "form_abc123",
"values": {"district": "Pechersk", "budget": 80000}
}
form_id must match the form_id from the rendered form payload.values is a JSON object mapping field names to submitted values.role="user" message (is_display=False, is_context=True) with the JSON payload and starts an agent turn. The message is persisted to the database but is never shown in the chat UI.Only one agent run may be active per session at a time. If a second message (or form_submit) arrives while a run is already in progress (either a WebSocket run or a headless recall), the server rejects it with a WebSocket error. The client should wait for stream_end before sending the next message.
All frames are JSON objects with a type field.
| Frame | When |
|---|---|
{"type": "stream_start"} |
Before any agent output begins |
{"type": "stream_end", "content": "...", "context_tokens": N, "max_context_tokens": N, "elapsed_seconds": N, "tool_call_count": N, "token_count": N, "message_index": N} |
After final text, before workers |
{"type": "stream_stopped"} |
If the user stopped generation |
{"type": "error", "message": "..."} |
On any unhandled error |
| Frame | When |
|---|---|
{"type": "thinking_delta", "delta": "..."} |
Reasoning chunk during streaming |
{"type": "thinking_end"} |
Reasoning phase complete |
{"type": "turn_thinking", "thinking": "...", "is_subagent": bool} |
Full reasoning block from a tool-calling turn (complete(), non-streaming) |
Thinking blocks are collapsible in the UI: open during reasoning, auto-collapsed on thinking_end.
| Frame | When | ||
|---|---|---|---|
| `{"type": "planning_status", "phase": 1 | 2 | 3, "label": "...", "is_subagent": bool}` | During planning phase — progress label for UI. phase: 1=analysis, 2=reflect, 3=plan |
{"type": "plan_ready", "plan": "...", "is_subagent": bool} |
Before tool-calling loop if planning_enabled and a plan was generated |
planning_status frames arrive during each planning phase (analysis → optional reflect → plan). is_subagent: true means the planning is running inside a subagent — route it into the spawn_agent card, never into the top-level UI.
plan_ready carries the formatted step list. Rendered as a collapsible plan card in the UI.
| Frame | When |
|---|---|
{"type": "tool_started", "tool": "name", "args": {...}, "is_subagent": bool} |
Immediately when a tool call begins (before execution) |
{"type": "tool_call", "tool": "name", "args": {...}, "result": "...", "success": bool, "is_subagent": bool, "metadata": {...}} |
When the tool finishes |
is_subagent: true indicates the tool call was made by a nested subagent, not the top-level agent.
| Frame | When |
|---|---|
{"type": "stream_delta", "delta": "..."} |
Text chunk of the final response |
| Frame | When | |
|---|---|---|
{"type": "compression_started", "context_tokens": N, "max_context_tokens": N} |
Immediately before context compression begins (UI can show a spinner) | |
{"type": "context_compressed", "messages_before": N, "messages_after": N, "summary": "...", "context_tokens": N, "max_context_tokens": N} |
After context compression runs | |
{"type": "profile_switched", "profile_id": "...", "profile_name": "..."} |
When switch_profile tool succeeds |
|
{"type": "recall_update", "session_id": "...", "recall_id": "...", "call_type": "...", "trigger_at": "...", "status": "...", "action": "..."} |
Recall state changed (scheduled, cancelled, fired, rescheduled) | |
| `{"type": "mcp_status_update", "server_name": "...", "status": "connected | disconnected", "tool_count": N, "error": "..."}` | MCP server connection status changed — broadcast to all sessions for toast notifications |
{"type": "heartbeat"} |
Periodic keepalive during long silent operations (every 20 s) | |
{"type": "session_sync"} |
Client should reload session history from REST (GET /sessions/{id}) |
session_sync is sent in three situations:
POST /sessions/{session_id}/stop
Sets _AgentRun.stop_event. The agent checks this event:
aclose() on the generator)The client sends this via fetch(), not over the WebSocket, to avoid corrupting the WebSocket receive state.
Response: {"ok": true} if a run was active, {"ok": false, "reason": "no active run"} otherwise.
If the client reconnects to an in-progress run (e.g. page reload mid-stream), websocket_session() detects the existing _AgentRun in _runs and replays the full event buffer before routing live events:
← stream_start
← replay_start {"type": "replay_start", "count": N}
← ev_0 ... ev_N (all buffered events replayed verbatim)
← replay_end {"type": "replay_end"}
← (live events continue from here)
...
← session_sync (after stream finishes — sync final saved state)
The client should suppress cursor animations and other in-progress effects while replay_start..replay_end is in flight.
If the client reconnects after the run has already finished, there is no active _AgentRun, so it receives only session_sync and must fetch history via REST.
_runs: dict[str, _AgentRun] — global dict of active runs, keyed by session ID.
_AgentRun holds:
task: asyncio.Task — the running agent taskstop_event: asyncio.Event — cooperative stop signalsubscribers: list[Queue] — one queue per connected WebSocket clientevents: list[dict] — replay buffer; every serialised event dict emitted this turnEvents are broadcast to all subscribers and appended to events. When the run finishes, _runs.pop(session_id) is called from the finally block. The subscribe-then-note-count ordering guarantees no events are missed between the two steps (single-threaded async Python).