diff --git a/navi/config.py b/navi/config.py index ade859f..d1cea32 100644 --- a/navi/config.py +++ b/navi/config.py @@ -38,6 +38,15 @@ session_files_max_size_mb: int = 200 session_files_ttl_hours: int = 24 + # LLM call timeouts + # complete() is non-streaming (planning, compression) — blocked until full response + llm_complete_timeout: int = 120 + # stream_complete(): how long to wait for the FIRST token (prefill phase) + # Large contexts can take 60-90s to prefill; 180s is a safe upper bound + llm_stream_first_chunk_timeout: int = 180 + # stream_complete(): max gap between any two subsequent tokens + llm_stream_chunk_timeout: int = 60 + # Context compression context_compression_enabled: bool = True context_compression_threshold: float = 0.80 # trigger at 80% of ollama_num_ctx diff --git a/navi/core/agent.py b/navi/core/agent.py index 70ffaf9..8754305 100644 --- a/navi/core/agent.py +++ b/navi/core/agent.py @@ -24,8 +24,8 @@ import structlog from navi.config import settings -from navi.exceptions import MaxIterationsReached, SessionNotFound -from navi.llm.base import LLMBackend, Message, ToolCallRequest +from navi.exceptions import ContextTooLargeError, LLMBackendError, MaxIterationsReached, SessionNotFound +from navi.llm.base import LLMBackend, LLMChunk, Message, ToolCallRequest from navi.tools.base import Tool, current_event_sink, current_stop_event from .compressor import compress_context, should_compress @@ -52,6 +52,79 @@ _USER_ENABLED_FILE = Path(settings.tools_dir) / "enabled.json" +async def _iter_stream_guarded( + stream_gen: "AsyncGenerator[LLMChunk, None]", + stop_event: "asyncio.Event | None", + first_chunk_timeout: float, + chunk_timeout: float, +) -> "AsyncGenerator[LLMChunk, None]": + """ + Wraps a streaming LLM generator with two safety mechanisms: + + 1. Stop-event responsiveness during prefill. + Normally, the agent only checks stop_event *between* chunks. During the + prefill phase (processing input tokens) Ollama emits no chunks at all — + the first await can block for minutes on large contexts. This wrapper polls + stop_event every second so the user's Stop button works even then. + + 2. Timeouts as a last-resort safety net. + first_chunk_timeout: how long to wait for the first token (prefill). + chunk_timeout: max gap between subsequent tokens. + On timeout the generator is closed, which terminates the HTTP connection + to Ollama → Ollama halts generation → GPU load drops to idle. + """ + first = True + try: + while True: + timeout = first_chunk_timeout if first else chunk_timeout + # Create one task per chunk; reuse across poll iterations so we + # don't accidentally start multiple concurrent __anext__ calls. + chunk_task: asyncio.Task = asyncio.ensure_future(stream_gen.__anext__()) + elapsed = 0.0 + + while True: + done, _ = await asyncio.wait({chunk_task}, timeout=1.0) + if done: + break + elapsed += 1.0 + if stop_event and stop_event.is_set(): + chunk_task.cancel() + try: + await chunk_task + except (asyncio.CancelledError, Exception): + pass + return + if elapsed >= timeout: + chunk_task.cancel() + try: + await chunk_task + except (asyncio.CancelledError, Exception): + pass + label = "first token (context may be too large for this model)" if first else "next token" + raise LLMBackendError( + f"LLM timed out after {elapsed:.0f}s waiting for {label}." + ) + + try: + chunk = chunk_task.result() + except StopAsyncIteration: + return + + first = False + yield chunk + + if stop_event and stop_event.is_set(): + return + + finally: + # Closing the generator terminates the HTTP connection to Ollama, + # which signals it to stop generating (GPU returns to idle). + try: + await stream_gen.aclose() + except Exception: + pass + + def _load_user_enabled_tools() -> list[str]: try: return json.loads(_USER_ENABLED_FILE.read_text()) @@ -198,14 +271,20 @@ accumulated_thinking = "" turn_tool_calls: list[ToolCallRequest] | None = None - async for chunk in llm.stream_complete( - self._build_context(context, profile, mem), - tools=tool_schemas if tools else None, - temperature=profile.temperature, - model=profile.model, + built_ctx = self._build_context(context, profile, mem) + self._check_context_size(built_ctx) + + async for chunk in _iter_stream_guarded( + llm.stream_complete( + built_ctx, + tools=tool_schemas if tools else None, + temperature=profile.temperature, + model=profile.model, + ), + stop_event=stop_event, + first_chunk_timeout=settings.llm_stream_first_chunk_timeout, + chunk_timeout=settings.llm_stream_chunk_timeout, ): - if stop_event and stop_event.is_set(): - break if chunk.thinking: accumulated_thinking += chunk.thinking if chunk.delta: @@ -363,11 +442,19 @@ thinking_active = False context_tokens: int | None = None - async for chunk in llm.stream_complete( - self._build_context(session.context, profile, mem), - tools=tool_schemas if tools else None, - temperature=profile.temperature, - model=profile.model, + built_ctx = self._build_context(session.context, profile, mem) + self._check_context_size(built_ctx) + + async for chunk in _iter_stream_guarded( + llm.stream_complete( + built_ctx, + tools=tool_schemas if tools else None, + temperature=profile.temperature, + model=profile.model, + ), + stop_event=stop_event, + first_chunk_timeout=settings.llm_stream_first_chunk_timeout, + chunk_timeout=settings.llm_stream_chunk_timeout, ): # Cooperative stop: break cleanly so aclose() is called on the # generator → Ollama stream closes gracefully, model stays in VRAM. @@ -554,10 +641,14 @@ phase1_ctx.extend(m for m in session.context if m.role != "system") try: - r1 = await llm.complete( - phase1_ctx, tools=None, temperature=0.3, model=profile.model, think=False + r1 = await asyncio.wait_for( + llm.complete(phase1_ctx, tools=None, temperature=0.3, model=profile.model, think=False), + timeout=settings.llm_complete_timeout, ) analysis = (r1.content or "").strip() + except asyncio.TimeoutError: + log.warning("agent.planning_phase1_timeout", timeout=settings.llm_complete_timeout) + return [] except Exception: log.warning("agent.planning_phase1_failed", exc_info=True) return [] @@ -603,10 +694,14 @@ phase2_ctx.append(user_msgs[-1]) try: - r2 = await llm.complete( - phase2_ctx, tools=None, temperature=0.3, model=profile.model, think=False + r2 = await asyncio.wait_for( + llm.complete(phase2_ctx, tools=None, temperature=0.3, model=profile.model, think=False), + timeout=settings.llm_complete_timeout, ) plan_text = (r2.content or "").strip() + except asyncio.TimeoutError: + log.warning("agent.planning_phase2_timeout", timeout=settings.llm_complete_timeout) + return [] except Exception: log.warning("agent.planning_phase2_failed", exc_info=True) return [] @@ -714,6 +809,23 @@ def _get_backend(self, backend_key: str) -> LLMBackend: return self._backends.get(backend_key) + def _check_context_size(self, context: list[Message]) -> None: + """Raise ContextTooLargeError before an LLM call if the context is dangerously large. + + Uses a conservative character-based estimate (~4 chars per token for text). + Images are counted at 500 tokens each (rough vision-model estimate). + Threshold: 92% of ollama_num_ctx — leaves headroom for the response. + """ + estimated = sum(len(m.content or "") for m in context) // 4 + estimated += sum(500 for m in context if m.images) + limit = int(settings.ollama_num_ctx * 0.92) + if estimated > limit: + raise ContextTooLargeError( + f"Context too large: ~{estimated:,} estimated tokens, " + f"safe limit {limit:,} (num_ctx={settings.ollama_num_ctx}). " + "Try splitting files into smaller parts or use a subagent for heavy analysis." + ) + async def _run_single_tool( self, tc: ToolCallRequest, diff --git a/navi/exceptions.py b/navi/exceptions.py index c029298..025b495 100644 --- a/navi/exceptions.py +++ b/navi/exceptions.py @@ -34,3 +34,7 @@ def __init__(self, limit: int): super().__init__(f"Agent reached max iterations limit ({limit})") self.limit = limit + + +class ContextTooLargeError(NaviError): + """Raised when the estimated context size would exceed the model's safe window."""