diff --git a/client/js/app.js b/client/js/app.js index f147e77..445f63c 100644 --- a/client/js/app.js +++ b/client/js/app.js @@ -52,7 +52,7 @@ textarea.addEventListener('keydown', onKey); textarea.addEventListener('input', onTextareaInput); textarea.addEventListener('paste', onPaste); - btnSend.addEventListener('click', sendMessage); + btnSend.addEventListener('click', () => streaming ? stopGeneration() : sendMessage()); btnNew.addEventListener('click', newChat); btnAttach.addEventListener('click', () => fileInput.click()); fileInput.addEventListener('change', onFileChange); @@ -229,8 +229,8 @@ streaming = true; currentBubble = null; currentThinking = null; - // Disable input — covers both normal sends and re-attach after page reload. - setInputEnabled(false); + // Switch send button to stop mode; disable attach. + setStreamMode(true); appendTypingIndicator(messagesEl); scrollToBottom(messagesEl); break; @@ -298,6 +298,13 @@ case 'stream_end': finishStream(event.content); updateTokenCounter(event.context_tokens, event.max_context_tokens); + setStreamMode(false); + setInputEnabled(true); + break; + + case 'stream_stopped': + finishStream(); + setStreamMode(false); setInputEnabled(true); break; @@ -321,6 +328,7 @@ case 'error': finishStream(); + setStreamMode(false); appendError(messagesEl, event.message); setInputEnabled(true); break; @@ -354,6 +362,7 @@ currentThinking = null; pendingToolCard = null; pendingSubStep = null; + setStreamMode(false); // Don't clear pendingImages/pendingFiles — those belong to the user's draft } @@ -443,6 +452,26 @@ if (on) textarea.focus(); } +/** Switch the send button between send (↑) and stop (■) modes. */ +function setStreamMode(active) { + if (active) { + btnSend.textContent = '■'; + btnSend.title = 'Stop generation'; + btnSend.disabled = false; + btnSend.classList.add('stop-mode'); + btnAttach.disabled = true; + } else { + btnSend.textContent = '↑'; + btnSend.title = 'Send (Enter)'; + btnSend.classList.remove('stop-mode'); + } +} + +function stopGeneration() { + ws.stop(); + btnSend.disabled = true; // prevent double-click while waiting for stream_stopped +} + /** Re-evaluate send button state based on current upload activity (called from upload callbacks). */ function syncSendButton() { // Only touch send — don't change btnAttach (that's controlled by streaming state). diff --git a/client/js/ws.js b/client/js/ws.js index 7c5f72c..5940ad5 100644 --- a/client/js/ws.js +++ b/client/js/ws.js @@ -23,6 +23,12 @@ }; } + stop() { + if (this.#ws?.readyState === WebSocket.OPEN) { + this.#ws.send(JSON.stringify({ type: 'stop' })); + } + } + send(content, images = null, files = null) { if (this.#ws?.readyState === WebSocket.OPEN) { const payload = { type: 'message', content }; diff --git a/client/style.css b/client/style.css index 69002e7..4f1c84d 100644 --- a/client/style.css +++ b/client/style.css @@ -795,6 +795,9 @@ } .btn-send:hover { background: var(--accent-hover); } .btn-send:disabled { opacity: 0.5; cursor: not-allowed; } +.btn-send.stop-mode { background: #c0392b; font-size: 14px; } +.btn-send.stop-mode:hover { background: #a93226; } +.btn-send.stop-mode:disabled { background: #c0392b; opacity: 0.5; } /* ── Scrollbar ───────────────────────────────────────── */ ::-webkit-scrollbar { width: 5px; } diff --git a/navi/api/websocket.py b/navi/api/websocket.py index dd61975..34102ef 100644 --- a/navi/api/websocket.py +++ b/navi/api/websocket.py @@ -25,7 +25,7 @@ from navi.api.deps import get_session_store from navi.core import Agent, ContextCompressed, StreamEnd, TextDelta, ThinkingDelta, ThinkingEnd, ToolEvent -from navi.core.events import ProfileSwitched, ToolStarted, TurnThinking +from navi.core.events import ProfileSwitched, StreamStopped, ToolStarted, TurnThinking from navi.exceptions import MaxIterationsReached, NaviError, SessionNotFound router = APIRouter(tags=["websocket"]) @@ -38,6 +38,7 @@ class _AgentRun: """Holds the running agent task and all active subscriber queues.""" task: asyncio.Task | None = None + stop_event: asyncio.Event = dataclasses.field(default_factory=asyncio.Event) subscribers: list[asyncio.Queue] = dataclasses.field(default_factory=list) def subscribe(self) -> asyncio.Queue: @@ -102,6 +103,8 @@ return {"type": "turn_thinking", "thinking": event.thinking, "is_subagent": event.is_subagent} if isinstance(event, ProfileSwitched): return {"type": "profile_switched", "profile_id": event.profile_id, "profile_name": event.profile_name} + if isinstance(event, StreamStopped): + return {"type": "stream_stopped"} return None @@ -116,9 +119,16 @@ Execute the agent to completion, broadcasting events to all subscribers. The session is saved by run_stream before StreamEnd — guaranteed even on disconnect. """ + from navi.tools.base import current_stop_event + current_stop_event.set(run.stop_event) + try: async for event in agent.run_stream(session_id, user_content, images=raw_images): await run.broadcast(("event", event)) + except asyncio.CancelledError: + log.info("ws.agent_stopped", session_id=session_id) + await run.broadcast(("stopped", None)) + raise # re-raise so the task is properly marked cancelled except SessionNotFound: await run.broadcast(("error", "Session not found")) except MaxIterationsReached as e: @@ -149,6 +159,8 @@ try: if kind == "error": await websocket.send_json({"type": "error", "message": payload}) + elif kind == "stopped": + await websocket.send_json({"type": "stream_stopped"}) elif kind == "event": msg = _event_to_dict(payload) if msg: @@ -157,6 +169,62 @@ client_alive = False +async def _stream_recv(websocket: WebSocket, queue: asyncio.Queue, run: _AgentRun) -> bool: + """ + Concurrently forward queue events to the WebSocket AND receive incoming + client messages (stop signals) during streaming. + + Uses asyncio.wait to race between the stream finishing and the client + sending a message — no polling, no latency on stop. + + Returns True if client stayed connected throughout. + """ + stream_task = asyncio.create_task(_stream_to_client(websocket, queue)) + recv_task: asyncio.Task = asyncio.create_task(websocket.receive_text()) + + try: + while True: + done, _ = await asyncio.wait( + {stream_task, recv_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + + # Incoming message from client + if recv_task in done: + try: + raw = recv_task.result() + try: + data = json.loads(raw) + except json.JSONDecodeError: + data = {} + if data.get("type") == "stop": + run.stop_event.set() + except Exception: + # Receive failed — client disconnected; let stream_task drain + stream_task.cancel() + break + + if stream_task.done(): + break + # Queue next receive + recv_task = asyncio.create_task(websocket.receive_text()) + + # Stream finished + if stream_task in done: + recv_task.cancel() + break + + except Exception: + for t in (stream_task, recv_task): + if not t.done(): + t.cancel() + + try: + return stream_task.result() + except Exception: + return False + + # ── Endpoint ────────────────────────────────────────────────────────────────── @router.websocket("/ws/sessions/{session_id}") @@ -189,7 +257,7 @@ queue = existing.subscribe() log.info("ws.reattached", session_id=session_id) await websocket.send_json({"type": "stream_start"}) - connected = await _stream_to_client(websocket, queue) + connected = await _stream_recv(websocket, queue, existing) existing.unsubscribe(queue) queue = None current_run = None @@ -244,7 +312,7 @@ ) await websocket.send_json({"type": "stream_start"}) - connected = await _stream_to_client(websocket, queue) + connected = await _stream_recv(websocket, queue, run) run.unsubscribe(queue) queue = None current_run = None diff --git a/navi/core/agent.py b/navi/core/agent.py index 49c0b5c..84e4567 100644 --- a/navi/core/agent.py +++ b/navi/core/agent.py @@ -26,13 +26,14 @@ from navi.config import settings from navi.exceptions import MaxIterationsReached, SessionNotFound from navi.llm.base import LLMBackend, Message, ToolCallRequest -from navi.tools.base import Tool, current_event_sink +from navi.tools.base import Tool, current_event_sink, current_stop_event from .compressor import compress_context, should_compress from .events import ( AgentEvent, ContextCompressed, StreamEnd, + StreamStopped, TextDelta, ThinkingDelta, ThinkingEnd, @@ -177,9 +178,13 @@ log.info("agent.subagent.start", profile_id=profile_id, max_iterations=max_iterations) + stop_event = current_stop_event.get() tool_map = {t.name: t for t in tools} for iteration in range(max_iterations): + if stop_event and stop_event.is_set(): + return accumulated_text if iteration > 0 else "" + log.debug("agent.subagent.iteration", iteration=iteration) accumulated_text = "" @@ -192,6 +197,8 @@ temperature=profile.temperature, model=profile.model, ): + if stop_event and stop_event.is_set(): + break if chunk.thinking: accumulated_thinking += chunk.thinking if chunk.delta: @@ -199,6 +206,9 @@ if chunk.tool_calls: turn_tool_calls = chunk.tool_calls + if stop_event and stop_event.is_set(): + return accumulated_text + if not turn_tool_calls: log.info("agent.subagent.complete", iterations=iteration + 1, result_len=len(accumulated_text)) @@ -322,9 +332,17 @@ # before the assistant reply is ready. await self._sessions.save(session) + stop_event = current_stop_event.get() + # Tool-calling loop — uses stream_complete() for every turn so thinking # is captured in real-time via ThinkingDelta/ThinkingEnd events. for iteration in range(profile.max_iterations): + # Cooperative stop: check before each LLM call + if stop_event and stop_event.is_set(): + await self._sessions.save(session) + yield StreamStopped() + return + accumulated_text = "" turn_tool_calls: list[ToolCallRequest] | None = None thinking_active = False @@ -336,6 +354,12 @@ temperature=profile.temperature, model=profile.model, ): + # Cooperative stop: break cleanly so aclose() is called on the + # generator → Ollama stream closes gracefully, model stays in VRAM. + if stop_event and stop_event.is_set(): + if thinking_active: + yield ThinkingEnd() + break if chunk.prompt_tokens is not None or chunk.completion_tokens is not None: context_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0) if chunk.thinking: @@ -354,6 +378,17 @@ thinking_active = False yield ThinkingEnd() + # Stopped mid-stream — save partial response and exit + if stop_event and stop_event.is_set(): + if accumulated_text: + session.messages.append(Message( + role="assistant", content=accumulated_text, + created_at=datetime.now(timezone.utc), + )) + await self._sessions.save(session) + yield StreamStopped() + return + if not turn_tool_calls: # Final response — text already streamed above assistant_msg = Message(role="assistant", content=accumulated_text, @@ -402,7 +437,7 @@ finally: await _sink.put(_TOOL_DONE) - asyncio.create_task(_run_with_sentinel()) + tool_task = asyncio.create_task(_run_with_sentinel()) current_event_sink.reset(sink_token) # outer ctx restored; task has its own copy # 3. Block on the sink until the sentinel arrives. @@ -426,7 +461,13 @@ if image_msg: session.context.append(image_msg) - # 6. If switch_profile was called this iteration, reload profile + tools. + # 6. Cooperative stop: check after tool execution before next LLM call + if stop_event and stop_event.is_set(): + await self._sessions.save(session) + yield StreamStopped() + return + + # 7. If switch_profile was called this iteration, reload profile + tools. # switch_profile updates the DB but run_stream() holds a local session # object — without this check the final save would overwrite the change # and the next LLM call would still use the old tool schemas. diff --git a/navi/core/compressor.py b/navi/core/compressor.py index 44adbcd..0276dff 100644 --- a/navi/core/compressor.py +++ b/navi/core/compressor.py @@ -117,6 +117,11 @@ return "\n".join(lines), images +# Safety limit: truncate formatted input to this many characters before sending to LLM. +# Prevents the summarizer from receiving near-context-sized input it can't fit alongside output. +_MAX_SUMMARY_INPUT_CHARS = 12_000 + + async def compress_context( context: list[Message], llm: LLMBackend, @@ -145,12 +150,18 @@ return None # nothing substantial to compress summary_text_input, images = _format_for_summary(to_summarize) + + # Truncate oversized input so the summarizer LLM has room to generate output + if len(summary_text_input) > _MAX_SUMMARY_INPUT_CHARS: + summary_text_input = summary_text_input[:_MAX_SUMMARY_INPUT_CHARS] + "\n…[truncated]" + prompt = [ Message(role="system", content=_SUMMARIZE_SYSTEM), Message(role="user", content=summary_text_input, images=images or None), ] - response = await llm.complete(prompt, tools=None, temperature=temperature, model=model) + # think=False: compression must be fast — extended reasoning wastes context and hangs + response = await llm.complete(prompt, tools=None, temperature=temperature, model=model, think=False) summary_text = (response.content or "").strip() or "(summary unavailable)" summary_msg = Message( diff --git a/navi/core/events.py b/navi/core/events.py index 15081ca..75578eb 100644 --- a/navi/core/events.py +++ b/navi/core/events.py @@ -52,6 +52,11 @@ @dataclass +class StreamStopped: + """Emitted when the user stops generation mid-stream (cooperative stop).""" + + +@dataclass class ContextCompressed: """Emitted after context compression runs successfully.""" @@ -82,5 +87,5 @@ AgentEvent = ( ToolStarted | ToolEvent | TextDelta | ThinkingDelta | ThinkingEnd - | StreamEnd | ContextCompressed | TurnThinking | ProfileSwitched + | StreamEnd | StreamStopped | ContextCompressed | TurnThinking | ProfileSwitched ) diff --git a/navi/llm/base.py b/navi/llm/base.py index c735b44..5b688e2 100644 --- a/navi/llm/base.py +++ b/navi/llm/base.py @@ -76,6 +76,7 @@ tools: list[ToolSchema] | None = None, temperature: float = 0.7, model: str | None = None, + think: bool | None = None, ) -> LLMResponse: """Single-shot completion. Used in the agent tool-calling loop.""" diff --git a/navi/llm/ollama.py b/navi/llm/ollama.py index a3f3c4e..578f73c 100644 --- a/navi/llm/ollama.py +++ b/navi/llm/ollama.py @@ -30,9 +30,11 @@ return [t.model_dump() for t in tools] -def _base_options(temperature: float) -> dict: +def _base_options(temperature: float, think: bool | None = None) -> dict: opts: dict = {"temperature": temperature, "num_ctx": settings.ollama_num_ctx} - if settings.ollama_think: + # think=None → use global setting; think=False → force off even if global is True + effective_think = settings.ollama_think if think is None else think + if effective_think: opts["think"] = True return opts @@ -48,12 +50,13 @@ tools: list[ToolSchema] | None = None, temperature: float = 0.7, model: str | None = None, + think: bool | None = None, ) -> LLMResponse: try: kwargs: dict = { "model": model or self.model, "messages": _to_ollama_messages(messages), - "options": _base_options(temperature), + "options": _base_options(temperature, think=think), "stream": False, } if tools: diff --git a/navi/llm/openai_backend.py b/navi/llm/openai_backend.py index 527ba0b..2adfbf4 100644 --- a/navi/llm/openai_backend.py +++ b/navi/llm/openai_backend.py @@ -22,6 +22,7 @@ tools: list[ToolSchema] | None = None, temperature: float = 0.7, model: str | None = None, + think: bool | None = None, ) -> LLMResponse: raise NotImplementedError("OpenAI backend not yet implemented") diff --git a/navi/profiles/secretary.py b/navi/profiles/secretary.py index 3e55548..e1736ce 100644 --- a/navi/profiles/secretary.py +++ b/navi/profiles/secretary.py @@ -20,5 +20,5 @@ enabled_tools=["todo", "switch_profile", "web_search", "web_view", "http_request", "filesystem", "code_exec", "terminal", "ssh_exec", "image_view", "memory_search", "memory_forget", "reload_tools", "write_tool", "list_tools", "tool_manual", "spawn_agent"], model="gemma4:26b-a4b-it-q4_K_M", temperature=0.7, - max_iterations=30, + max_iterations=100, ) diff --git a/navi/profiles/server_admin.py b/navi/profiles/server_admin.py index b970e8a..d470397 100644 --- a/navi/profiles/server_admin.py +++ b/navi/profiles/server_admin.py @@ -21,8 +21,8 @@ 3. Before destructive or irreversible operations, state what you're about to do and why. When delegating to sub-agents: assign each a single host or a single domain of concern. Include exact connection details and expected output format in every briefing.""", - enabled_tools=["todo", "switch_profile", "terminal", "filesystem", "http_request", "web_view", "web_search", "ssh_exec", "image_view", "memory_search", "memory_forget", "reload_tools", "write_tool", "list_tools", "tool_manual", "spawn_agent"], + enabled_tools=["todo", "switch_profile", "terminal", "filesystem", "http_request", "web_view", "web_search", "ssh_exec", "image_view", "memory_search", "memory_forget", "reload_tools", "write_tool", "list_tools", "tool_manual", "spawn_agent", "code_exec"], model="gemma4:26b-a4b-it-q4_K_M", temperature=0.2, - max_iterations=30, + max_iterations=100, ) diff --git a/navi/profiles/smart_home.py b/navi/profiles/smart_home.py index 115dae9..e935b14 100644 --- a/navi/profiles/smart_home.py +++ b/navi/profiles/smart_home.py @@ -22,5 +22,5 @@ enabled_tools=["todo", "switch_profile", "http_request", "web_view", "filesystem", "code_exec", "terminal", "ssh_exec", "image_view", "memory_search", "memory_forget", "reload_tools", "write_tool", "list_tools", "tool_manual", "spawn_agent"], model="gemma4:26b-a4b-it-q4_K_M", temperature=0.3, - max_iterations=30, + max_iterations=100, ) diff --git a/navi/tools/base.py b/navi/tools/base.py index e2370d5..a2b3dc6 100644 --- a/navi/tools/base.py +++ b/navi/tools/base.py @@ -22,6 +22,11 @@ # sub-agent tool events up to the parent WS stream. current_event_sink: ContextVar[asyncio.Queue | None] = ContextVar("current_event_sink", default=None) +# Set by _run_agent() before run_stream(). Cooperative stop: when set, the agent +# breaks out of LLM loops cleanly (aclose() is called → Ollama stream closes gracefully, +# model stays in VRAM). Never use task.cancel() for stopping generation. +current_stop_event: ContextVar[asyncio.Event | None] = ContextVar("current_stop_event", default=None) + @dataclass class ToolResult: diff --git a/navi/tools/terminal.py b/navi/tools/terminal.py index 25916ea..101ab2d 100644 --- a/navi/tools/terminal.py +++ b/navi/tools/terminal.py @@ -14,7 +14,7 @@ from .base import Tool, ToolResult -_TIMEOUT = 180 +_TIMEOUT = 300 class TerminalTool(Tool):