Full protocol reference for the streaming agent interface. File: navi/api/websocket.py.
ws://host/ws/sessions/{session_id}
The session must exist before connecting (create via POST /sessions). If the session is not found, the WebSocket closes with code 4004.
On connect the server immediately sends either session_sync (no active run) or begins the reconnect flow (active run detected).
{
"type": "message",
"content": "user text",
"images": ["base64string", ...],
"files": [{"name": "file.pdf", "path": "/abs/path"}]
}
type must be "message". Other types return an error frame.content is required and must be non-empty.images: optional list of base64-encoded images (data URIs accepted; the data:...;base64, prefix is stripped server-side).files: optional list of uploaded file references (appended to content as [Uploaded files on disk: ...]).All frames are JSON objects with a type field.
| Frame | When |
|---|---|
{"type": "stream_start"} |
Before any agent output begins |
{"type": "stream_end", "content": "...", "context_tokens": N, "max_context_tokens": N, "elapsed_seconds": N, "tool_call_count": N, "token_count": N, "message_index": N} |
After final text, before workers |
{"type": "stream_stopped"} |
If the user stopped generation |
{"type": "error", "message": "..."} |
On any unhandled error |
| Frame | When |
|---|---|
{"type": "thinking_delta", "delta": "..."} |
Reasoning chunk during streaming |
{"type": "thinking_end"} |
Reasoning phase complete |
{"type": "turn_thinking", "thinking": "...", "is_subagent": bool} |
Full reasoning block from a tool-calling turn (complete(), non-streaming) |
Thinking blocks are collapsible in the UI: open during reasoning, auto-collapsed on thinking_end.
| Frame | When | ||
|---|---|---|---|
| `{"type": "planning_status", "phase": 1 | 2 | 3, "label": "...", "is_subagent": bool}` | During planning phase — progress label for UI. phase: 1=analysis, 2=reflect, 3=plan |
{"type": "plan_ready", "plan": "...", "is_subagent": bool} |
Before tool-calling loop if planning_enabled and a plan was generated |
planning_status frames arrive during each planning phase (analysis → optional reflect → plan). is_subagent: true means the planning is running inside a subagent — route it into the spawn_agent card, never into the top-level UI.
plan_ready carries the formatted step list. Rendered as a collapsible plan card in the UI.
| Frame | When |
|---|---|
{"type": "tool_started", "tool": "name", "args": {...}, "is_subagent": bool} |
Immediately when a tool call begins (before execution) |
{"type": "tool_call", "tool": "name", "args": {...}, "result": "...", "success": bool, "is_subagent": bool, "metadata": {...}} |
When the tool finishes |
is_subagent: true indicates the tool call was made by a nested subagent, not the top-level agent.
| Frame | When |
|---|---|
{"type": "stream_delta", "delta": "..."} |
Text chunk of the final response |
| Frame | When |
|---|---|
{"type": "context_compressed", "messages_before": N, "messages_after": N, "summary": "...", "context_tokens": N, "max_context_tokens": N} |
After context compression runs |
{"type": "profile_switched", "profile_id": "...", "profile_name": "..."} |
When switch_profile tool succeeds |
{"type": "heartbeat"} |
Periodic keepalive during long silent operations (every 20 s) |
{"type": "session_sync"} |
Client should reload session history from REST (GET /sessions/{id}) |
session_sync is sent in two situations:
POST /sessions/{session_id}/stop
Sets _AgentRun.stop_event. The agent checks this event:
aclose() on the generator)The client sends this via fetch(), not over the WebSocket, to avoid corrupting the WebSocket receive state.
Response: {"ok": true} if a run was active, {"ok": false, "reason": "no active run"} otherwise.
If the client reconnects to an in-progress run (e.g. page reload mid-stream), websocket_session() detects the existing _AgentRun in _runs and replays the full event buffer before routing live events:
← stream_start
← replay_start {"type": "replay_start", "count": N}
← ev_0 ... ev_N (all buffered events replayed verbatim)
← replay_end {"type": "replay_end"}
← (live events continue from here)
...
← session_sync (after stream finishes — sync final saved state)
The client should suppress cursor animations and other in-progress effects while replay_start..replay_end is in flight.
If the client reconnects after the run has already finished, there is no active _AgentRun, so it receives only session_sync and must fetch history via REST.
_runs: dict[str, _AgentRun] — global dict of active runs, keyed by session ID.
_AgentRun holds:
task: asyncio.Task — the running agent taskstop_event: asyncio.Event — cooperative stop signalsubscribers: list[Queue] — one queue per connected WebSocket clientevents: list[dict] — replay buffer; every serialised event dict emitted this turnEvents are broadcast to all subscribers and appended to events. When the run finishes, _runs.pop(session_id) is called from the finally block. The subscribe-then-note-count ordering guarantees no events are missed between the two steps (single-threaded async Python).