# WebSocket Protocol

Full protocol reference for the streaming agent interface. File: `navi/api/websocket.py`.

## Connection

`ws://host/ws/sessions/{session_id}`

The session must exist before connecting (create via `POST /sessions`). If the session is not found, the WebSocket closes with code `4004`.

### Authentication

**Browser clients**: cookies are sent automatically during the WebSocket handshake. The server resolves the user from the `navi_auth_session` cookie.

**Headless clients**: append the API token as a query parameter:
```
ws://host/ws/sessions/{session_id}?api_token=nav_aB3xYz9W...
```

**No-auth mode** (`NAVI_AUTH_ENABLED=false`): neither cookies nor tokens are required. Every WebSocket connection is treated as the local `anonymous` admin user.

> **Security note**: `?api_token` is visible in server access logs. For log-sensitive deployments, future versions may support sending `{type: "auth", api_token: "..."}` as the first WebSocket message after connect instead.

On connect the server immediately sends either `session_sync` (no active run) or begins the reconnect flow (active run detected).

---

## Messages: client → server

```json
{
    "type": "message",
    "content": "user text",
    "images": ["base64string", ...],
    "files": [{"name": "file.pdf", "path": "/abs/path"}]
}
```

- `type` must be `"message"`. Other types return an error frame.
- `content` is required and must be non-empty.
- `images`: optional list of base64-encoded images (data URIs accepted; the `data:...;base64,` prefix is stripped server-side). **Limits:** max 10 images per message, 5 MB each (decoded). Excess images are rejected with a WebSocket error.
- `files`: optional list of uploaded file references (appended to content as `[Uploaded files on disk: ...]`).

### Concurrent run guard
Only one agent run may be active per session at a time. If a second message arrives while a run is already in progress (either a WebSocket run or a headless recall), the server rejects it with a WebSocket error. The client should wait for `stream_end` before sending the next message.

---

## Messages: server → client

All frames are JSON objects with a `type` field.

### Stream lifecycle

| Frame | When |
|---|---|
| `{"type": "stream_start"}` | Before any agent output begins |
| `{"type": "stream_end", "content": "...", "context_tokens": N, "max_context_tokens": N, "elapsed_seconds": N, "tool_call_count": N, "token_count": N, "message_index": N}` | After final text, before workers |
| `{"type": "stream_stopped"}` | If the user stopped generation |
| `{"type": "error", "message": "..."}` | On any unhandled error |

### Thinking (reasoning)

| Frame | When |
|---|---|
| `{"type": "thinking_delta", "delta": "..."}` | Reasoning chunk during streaming |
| `{"type": "thinking_end"}` | Reasoning phase complete |
| `{"type": "turn_thinking", "thinking": "...", "is_subagent": bool}` | Full reasoning block from a tool-calling turn (complete(), non-streaming) |

Thinking blocks are collapsible in the UI: open during reasoning, auto-collapsed on `thinking_end`.

### Planning

| Frame | When |
|---|---|
| `{"type": "planning_status", "phase": 1|2|3, "label": "...", "is_subagent": bool}` | During planning phase — progress label for UI. `phase`: 1=analysis, 2=reflect, 3=plan |
| `{"type": "plan_ready", "plan": "...", "is_subagent": bool}` | Before tool-calling loop if `planning_enabled` and a plan was generated |

`planning_status` frames arrive during each planning phase (analysis → optional reflect → plan). `is_subagent: true` means the planning is running inside a subagent — route it into the spawn_agent card, never into the top-level UI.

`plan_ready` carries the formatted step list. Rendered as a collapsible plan card in the UI.

### Tool calls

| Frame | When |
|---|---|
| `{"type": "tool_started", "tool": "name", "args": {...}, "is_subagent": bool}` | Immediately when a tool call begins (before execution) |
| `{"type": "tool_call", "tool": "name", "args": {...}, "result": "...", "success": bool, "is_subagent": bool, "metadata": {...}}` | When the tool finishes |

`is_subagent: true` indicates the tool call was made by a nested subagent, not the top-level agent.

### Text output

| Frame | When |
|---|---|
| `{"type": "stream_delta", "delta": "..."}` | Text chunk of the final response |

### Other events

| Frame | When |
|---|---|
| `{"type": "compression_started", "context_tokens": N, "max_context_tokens": N}` | Immediately before context compression begins (UI can show a spinner) |
| `{"type": "context_compressed", "messages_before": N, "messages_after": N, "summary": "...", "context_tokens": N, "max_context_tokens": N}` | After context compression runs |
| `{"type": "profile_switched", "profile_id": "...", "profile_name": "..."}` | When `switch_profile` tool succeeds |
| `{"type": "recall_update", "session_id": "...", "recall_id": "...", "call_type": "...", "trigger_at": "...", "status": "...", "action": "..."}` | Recall state changed (scheduled, cancelled, fired, rescheduled) |
| `{"type": "mcp_status_update", "server_name": "...", "status": "connected|disconnected", "tool_count": N, "error": "..."}` | MCP server connection status changed — broadcast to all sessions for toast notifications |
| `{"type": "heartbeat"}` | Periodic keepalive during long silent operations (every 20 s) |
| `{"type": "session_sync"}` | Client should reload session history from REST (`GET /sessions/{id}`) |

`session_sync` is sent in three situations:
1. On fresh connect when no run is active — in case the agent finished while the client was disconnected.
2. After a reconnect-and-replay completes — to ensure the client sees the fully saved response.
3. After a headless recall run finishes — so the client sees the full recall turn (recall user message + assistant response).

---

## Stopping generation

`POST /sessions/{session_id}/stop`

Sets `_AgentRun.stop_event`. The agent checks this event:
- Before each LLM call
- During streaming (breaks out, calls `aclose()` on the generator)
- After tool execution

The client sends this via `fetch()`, not over the WebSocket, to avoid corrupting the WebSocket receive state.

Response: `{"ok": true}` if a run was active, `{"ok": false, "reason": "no active run"}` otherwise.

---

## Reconnection

If the client reconnects to an in-progress run (e.g. page reload mid-stream), `websocket_session()` detects the existing `_AgentRun` in `_runs` and replays the full event buffer before routing live events:

```
← stream_start
← replay_start   {"type": "replay_start", "count": N}
← ev_0 ... ev_N  (all buffered events replayed verbatim)
← replay_end     {"type": "replay_end"}
← (live events continue from here)
...
← session_sync   (after stream finishes — sync final saved state)
```

The client should suppress cursor animations and other in-progress effects while `replay_start`..`replay_end` is in flight.

If the client reconnects after the run has already finished, there is no active `_AgentRun`, so it receives only `session_sync` and must fetch history via REST.

---

## Run state management

`_runs: dict[str, _AgentRun]` — global dict of active runs, keyed by session ID.

`_AgentRun` holds:
- `task: asyncio.Task` — the running agent task
- `stop_event: asyncio.Event` — cooperative stop signal
- `subscribers: list[Queue]` — one queue per connected WebSocket client
- `events: list[dict]` — replay buffer; every serialised event dict emitted this turn

Events are broadcast to all subscribers **and** appended to `events`. When the run finishes, `_runs.pop(session_id)` is called from the `finally` block. The subscribe-then-note-count ordering guarantees no events are missed between the two steps (single-threaded async Python).
