Newer
Older
navi-1 / docs / websocket.md

WebSocket Protocol

Full protocol reference for the streaming agent interface. File: navi/api/websocket.py.

Connection

ws://host/ws/sessions/{session_id}

The session must exist before connecting (create via POST /sessions). If the session is not found, the WebSocket closes with code 4004.

On connect the server immediately sends either session_sync (no active run) or begins the reconnect flow (active run detected).


Messages: client → server

{
    "type": "message",
    "content": "user text",
    "images": ["base64string", ...],
    "files": [{"name": "file.pdf", "path": "/abs/path"}]
}
  • type must be "message". Other types return an error frame.
  • content is required and must be non-empty.
  • images: optional list of base64-encoded images (data URIs accepted; the data:...;base64, prefix is stripped server-side). Limits: max 10 images per message, 5 MB each (decoded). Excess images are rejected with a WebSocket error.
  • files: optional list of uploaded file references (appended to content as [Uploaded files on disk: ...]).

Concurrent run guard

Only one agent run may be active per session at a time. If a second message arrives while a run is already in progress (either a WebSocket run or a headless recall), the server rejects it with a WebSocket error. The client should wait for stream_end before sending the next message.


Messages: server → client

All frames are JSON objects with a type field.

Stream lifecycle

Frame When
{"type": "stream_start"} Before any agent output begins
{"type": "stream_end", "content": "...", "context_tokens": N, "max_context_tokens": N, "elapsed_seconds": N, "tool_call_count": N, "token_count": N, "message_index": N} After final text, before workers
{"type": "stream_stopped"} If the user stopped generation
{"type": "error", "message": "..."} On any unhandled error

Thinking (reasoning)

Frame When
{"type": "thinking_delta", "delta": "..."} Reasoning chunk during streaming
{"type": "thinking_end"} Reasoning phase complete
{"type": "turn_thinking", "thinking": "...", "is_subagent": bool} Full reasoning block from a tool-calling turn (complete(), non-streaming)

Thinking blocks are collapsible in the UI: open during reasoning, auto-collapsed on thinking_end.

Planning

Frame When
`{"type": "planning_status", "phase": 1 2 3, "label": "...", "is_subagent": bool}` During planning phase — progress label for UI. phase: 1=analysis, 2=reflect, 3=plan
{"type": "plan_ready", "plan": "...", "is_subagent": bool} Before tool-calling loop if planning_enabled and a plan was generated

planning_status frames arrive during each planning phase (analysis → optional reflect → plan). is_subagent: true means the planning is running inside a subagent — route it into the spawn_agent card, never into the top-level UI.

plan_ready carries the formatted step list. Rendered as a collapsible plan card in the UI.

Tool calls

Frame When
{"type": "tool_started", "tool": "name", "args": {...}, "is_subagent": bool} Immediately when a tool call begins (before execution)
{"type": "tool_call", "tool": "name", "args": {...}, "result": "...", "success": bool, "is_subagent": bool, "metadata": {...}} When the tool finishes

is_subagent: true indicates the tool call was made by a nested subagent, not the top-level agent.

Text output

Frame When
{"type": "stream_delta", "delta": "..."} Text chunk of the final response

Other events

Frame When
{"type": "compression_started", "context_tokens": N, "max_context_tokens": N} Immediately before context compression begins (UI can show a spinner)
{"type": "context_compressed", "messages_before": N, "messages_after": N, "summary": "...", "context_tokens": N, "max_context_tokens": N} After context compression runs
{"type": "profile_switched", "profile_id": "...", "profile_name": "..."} When switch_profile tool succeeds
{"type": "recall_update", "session_id": "...", "recall_id": "...", "call_type": "...", "trigger_at": "...", "status": "...", "action": "..."} Recall state changed (scheduled, cancelled, fired, rescheduled)
{"type": "heartbeat"} Periodic keepalive during long silent operations (every 20 s)
{"type": "session_sync"} Client should reload session history from REST (GET /sessions/{id})

session_sync is sent in three situations:

  1. On fresh connect when no run is active — in case the agent finished while the client was disconnected.
  2. After a reconnect-and-replay completes — to ensure the client sees the fully saved response.
  3. After a headless recall run finishes — so the client sees the full recall turn (recall user message + assistant response).

Stopping generation

POST /sessions/{session_id}/stop

Sets _AgentRun.stop_event. The agent checks this event:

  • Before each LLM call
  • During streaming (breaks out, calls aclose() on the generator)
  • After tool execution

The client sends this via fetch(), not over the WebSocket, to avoid corrupting the WebSocket receive state.

Response: {"ok": true} if a run was active, {"ok": false, "reason": "no active run"} otherwise.


Reconnection

If the client reconnects to an in-progress run (e.g. page reload mid-stream), websocket_session() detects the existing _AgentRun in _runs and replays the full event buffer before routing live events:

← stream_start
← replay_start   {"type": "replay_start", "count": N}
← ev_0 ... ev_N  (all buffered events replayed verbatim)
← replay_end     {"type": "replay_end"}
← (live events continue from here)
...
← session_sync   (after stream finishes — sync final saved state)

The client should suppress cursor animations and other in-progress effects while replay_start..replay_end is in flight.

If the client reconnects after the run has already finished, there is no active _AgentRun, so it receives only session_sync and must fetch history via REST.


Run state management

_runs: dict[str, _AgentRun] — global dict of active runs, keyed by session ID.

_AgentRun holds:

  • task: asyncio.Task — the running agent task
  • stop_event: asyncio.Event — cooperative stop signal
  • subscribers: list[Queue] — one queue per connected WebSocket client
  • events: list[dict] — replay buffer; every serialised event dict emitted this turn

Events are broadcast to all subscribers and appended to events. When the run finishes, _runs.pop(session_id) is called from the finally block. The subscribe-then-note-count ordering guarantees no events are missed between the two steps (single-threaded async Python).