diff --git a/client/js/app.js b/client/js/app.js index 64b00dd..432fa35 100644 --- a/client/js/app.js +++ b/client/js/app.js @@ -483,8 +483,8 @@ } function stopGeneration() { - ws.stop(); btnSend.disabled = true; // prevent double-click while waiting for stream_stopped + fetch(`/sessions/${currentId}/stop`, { method: 'POST' }).catch(console.error); } function onTextareaInput() { diff --git a/client/js/ws.js b/client/js/ws.js index 5940ad5..7c5f72c 100644 --- a/client/js/ws.js +++ b/client/js/ws.js @@ -23,12 +23,6 @@ }; } - stop() { - if (this.#ws?.readyState === WebSocket.OPEN) { - this.#ws.send(JSON.stringify({ type: 'stop' })); - } - } - send(content, images = null, files = null) { if (this.#ws?.readyState === WebSocket.OPEN) { const payload = { type: 'message', content }; diff --git a/navi/api/websocket.py b/navi/api/websocket.py index 34102ef..0f837fd 100644 --- a/navi/api/websocket.py +++ b/navi/api/websocket.py @@ -169,63 +169,17 @@ client_alive = False -async def _stream_recv(websocket: WebSocket, queue: asyncio.Queue, run: _AgentRun) -> bool: - """ - Concurrently forward queue events to the WebSocket AND receive incoming - client messages (stop signals) during streaming. +# ── Endpoints ───────────────────────────────────────────────────────────────── - Uses asyncio.wait to race between the stream finishing and the client - sending a message — no polling, no latency on stop. +@router.post("/sessions/{session_id}/stop") +async def stop_session(session_id: str) -> dict: + """Signal the running agent for this session to stop cooperatively.""" + run = _runs.get(session_id) + if run is not None: + run.stop_event.set() + return {"ok": True} + return {"ok": False, "reason": "no active run"} - Returns True if client stayed connected throughout. - """ - stream_task = asyncio.create_task(_stream_to_client(websocket, queue)) - recv_task: asyncio.Task = asyncio.create_task(websocket.receive_text()) - - try: - while True: - done, _ = await asyncio.wait( - {stream_task, recv_task}, - return_when=asyncio.FIRST_COMPLETED, - ) - - # Incoming message from client - if recv_task in done: - try: - raw = recv_task.result() - try: - data = json.loads(raw) - except json.JSONDecodeError: - data = {} - if data.get("type") == "stop": - run.stop_event.set() - except Exception: - # Receive failed — client disconnected; let stream_task drain - stream_task.cancel() - break - - if stream_task.done(): - break - # Queue next receive - recv_task = asyncio.create_task(websocket.receive_text()) - - # Stream finished - if stream_task in done: - recv_task.cancel() - break - - except Exception: - for t in (stream_task, recv_task): - if not t.done(): - t.cancel() - - try: - return stream_task.result() - except Exception: - return False - - -# ── Endpoint ────────────────────────────────────────────────────────────────── @router.websocket("/ws/sessions/{session_id}") async def websocket_session(session_id: str, websocket: WebSocket) -> None: @@ -257,7 +211,7 @@ queue = existing.subscribe() log.info("ws.reattached", session_id=session_id) await websocket.send_json({"type": "stream_start"}) - connected = await _stream_recv(websocket, queue, existing) + connected = await _stream_to_client(websocket, queue) existing.unsubscribe(queue) queue = None current_run = None @@ -312,7 +266,7 @@ ) await websocket.send_json({"type": "stream_start"}) - connected = await _stream_recv(websocket, queue, run) + connected = await _stream_to_client(websocket, queue) run.unsubscribe(queue) queue = None current_run = None