diff --git a/docs/agent.md b/docs/agent.md index dbe3944..b51ca01 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -21,6 +21,9 @@ `run_ephemeral` reads the parent session from the DB when `parent_session_id` is provided, so session-aware tools (filesystem, todo, scratchpad) operate on the parent's data. +### ContextVar restoration +`run_ephemeral` saves the parent's `current_session_id`, `current_model`, `current_user_id`, `current_user_role`, and `current_user_info` before starting and restores them in a `finally` block. This prevents background tasks or the next parent iteration from inheriting stale subagent IDs. + --- ## Planning phase (`_run_planning`) @@ -104,6 +107,17 @@ ### Cooperative stop Stop is signalled via `current_stop_event` (an `asyncio.Event`). Checked before each LLM call, during streaming, and after tool execution. Never use `task.cancel()` — it corrupts WebSocket state. +### Streaming guard wrapper +`run_stream()` wraps the LLM generator with `_iter_stream_guarded()`, which provides two safety layers: + +1. **Stop-event polling during prefill.** Ollama emits no chunks during prefill, so a plain `await` on the first token can block for minutes. The wrapper polls `stop_event` every second so the user's Stop button works even during silent prefill. +2. **Hard timeouts.** `first_chunk_timeout` (default 120 s) caps prefill wait time. `chunk_timeout` (default 60 s) caps gaps between subsequent tokens. On timeout the generator is closed, terminating the HTTP connection to Ollama so GPU load drops to idle. + +| Env var | Default | Purpose | +|---|---|---| +| `LLM_STREAM_FIRST_CHUNK_TIMEOUT` | `120` | Max seconds to wait for the first token | +| `LLM_STREAM_CHUNK_TIMEOUT` | `60` | Max seconds between tokens after the first | + --- ## Workers @@ -122,3 +136,6 @@ 3. `session.context` messages (system messages stripped to avoid duplication). Profile switches and persona changes take effect immediately. + +### System prompt caching +The built system prompt string is cached per profile ID in `ContextBuilder` to avoid rebuilding on every turn. The cache is invalidated when the profile is reloaded (e.g. after `switch_profile` or hot-reload). This saves ~1–2 ms per turn for profiles with large system prompts. diff --git a/docs/mechanics.md b/docs/mechanics.md index 86c06a9..9f4b9c2 100644 --- a/docs/mechanics.md +++ b/docs/mechanics.md @@ -24,7 +24,7 @@ |---|---|---|---|---| | **Streaming entry point** | `run_stream()` — yields `AgentEvent` objects in real time. Loads session, runs planning (if enabled), tool loop, workers. | `profile.max_iterations`, `profile.llm_backend`, `profile.model`, `profile.temperature` | `agent.py` | ✅ | | **Non-streaming entry point** | `run()` — same loop but returns plain string. No planning phase, no events. | Same as above | `agent.py` | ✅ | -| **Streaming guard wrapper** | Wraps `llm.stream_complete()` with two safety layers: (1) polls `stop_event` every second during prefill so the Stop button works even when the model emits no chunks, and (2) hard `first_chunk_timeout`/`chunk_timeout` deadlines that close the HTTP connection to Ollama so GPU load drops. | `LLM_STREAM_FIRST_CHUNK_TIMEOUT`, `LLM_STREAM_CHUNK_TIMEOUT` | `agent.py` | ❌ | +| **Streaming guard wrapper** | Wraps `llm.stream_complete()` with two safety layers: (1) polls `stop_event` every second during prefill so the Stop button works even when the model emits no chunks, and (2) hard `first_chunk_timeout`/`chunk_timeout` deadlines that close the HTTP connection to Ollama so GPU load drops. | `LLM_STREAM_FIRST_CHUNK_TIMEOUT`, `LLM_STREAM_CHUNK_TIMEOUT` | `agent.py` | ✅ | | **Subagent thinking stall detector** | Monitors subagent streaming; if only `thinking` output is emitted for 60 s or 12 000 chars without text/tool calls, aborts the subagent to prevent endless internal-token loops on local models. | Hard-coded `_SUBAGENT_THINKING_STALL_SECONDS=60.0`, `_SUBAGENT_THINKING_STALL_CHARS=12000` | `agent.py` | ❌ | | **Cooperative stop** | Checks `current_stop_event` (asyncio.Event) before each LLM call, during streaming, and after tool execution. Uses clean generator close — never `task.cancel()`. | None | `agent.py` | ✅ | | **First-message forced planning** | Planning phase always runs on the first user message in a session regardless of `profile.planning_enabled`. | `profile.planning_enabled` (only affects subsequent turns) | `agent.py` | ✅ | @@ -59,7 +59,7 @@ | **Subagent planning phase** | Optionally runs full 3-phase planning before tool loop for subagents. | `profile.subagent_planning_enabled` | `agent.py` | ✅ | | **Parent session ID passthrough** | Sets session ContextVar to parent's ID so session-aware tools resolve paths correctly. | `parent_session_id` param | `agent.py` | ✅ | | **Dedicated subagent tool list** | Uses `profile.subagent_tools` if non-empty; falls back to `profile.enabled_tools`. | `profile.subagent_tools` | `agent.py` | ✅ | -| **ContextVar restoration** | Saves/restores `current_session_id`, `current_model`, `current_user_id`, `current_user_role`, `current_user_info` in `finally` block. | None | `agent.py` | ❌ | +| **ContextVar restoration** | Saves/restores `current_session_id`, `current_model`, `current_user_id`, `current_user_role`, `current_user_info` in `finally` block. | None | `agent.py` | ✅ | ## Planning Pipeline (`navi/core/planning.py`) @@ -78,7 +78,7 @@ | Mechanic | Description | Config / Flags | Files | Docs | |---|---|---|---|---| -| **System prompt caching** | Caches built system prompt per profile ID to avoid rebuilding on every turn. Provides `invalidate_system_prompt_cache()`. | None | `context_builder.py` | ❌ | +| **System prompt caching** | Caches built system prompt per profile ID to avoid rebuilding on every turn. Provides `invalidate_system_prompt_cache()`. | None | `context_builder.py` | ✅ | | **Persona + profile construction** | Prepends global `NAVI_PERSONA` to profile's `system_prompt`, separated by `---`. | `NAVI_PERSONA_FILE` | `context_builder.py` | ✅ | | **Cross-profile awareness** | Appends `## Available profiles` block listing all other profiles with descriptions. | None | `context_builder.py` | ⚠️ | | **Memory summary message** | Injects `## What I remember about the user` if memory store has a summary. | None | `context_builder.py` | ✅ | @@ -246,10 +246,10 @@ | **Streaming protocol** | Full-duplex: client sends `message`, server emits `stream_start`, `thinking_delta`, `stream_delta`, `tool_call`, `stream_end`, etc. | `_HEARTBEAT_INTERVAL=20.0`, `_MAX_REPLAY_EVENTS=500`, `_MAX_IMAGES=10`, `_MAX_IMAGE_BYTES=5MB` | `websocket.py` | ✅ | | **Stop session** | `POST /sessions/{id}/stop` sets stop_event cooperatively. | None | `websocket.py` | ✅ | | **Reconnect/replay** | On connect, if run active, replays buffered events before live stream. | `_MAX_REPLAY_EVENTS=500` | `websocket.py` | ✅ | -| **Image upload validation** | Max 10 images, 5MB each. Strips `data:...;base64,` prefix. | `_MAX_IMAGES=10`, `_MAX_IMAGE_BYTES=5242880` | `websocket.py` | ❌ | +| **Image upload validation** | Max 10 images, 5MB each. Strips `data:...;base64,` prefix. | `_MAX_IMAGES=10`, `_MAX_IMAGE_BYTES=5242880` | `websocket.py` | ✅ | | **Image context annotation** | Appends note telling model that N images are already in multimodal context. | None | `websocket.py` | ❌ | | **File context annotation** | Appends `[Uploaded files on disk: ...]` to user content. | None | `websocket.py` | ⚠️ | -| **Concurrent run guard** | Rejects new messages if `_runs` or `_busy_sessions` already contains session ID. | None | `websocket.py` | ❌ | +| **Concurrent run guard** | Rejects new messages if `_runs` or `_busy_sessions` already contains session ID. | None | `websocket.py` | ✅ | | **Heartbeat keepalive** | Sends `heartbeat` every 20 seconds during idle. | `_HEARTBEAT_INTERVAL=20.0` | `websocket.py` | ✅ | | **User ContextVar propagation** | Sets `current_user_id`, `current_user_role`, `current_user_info` from resolved `User` before agent run. | None | `websocket.py` | ❌ | | **Recall update forwarding** | Subscribes to `RecallUpdate` events and forwards to open WebSockets for affected session. | None | `websocket.py` | ✅ | diff --git a/docs/store.md b/docs/store.md index 76a8969..0e24f12 100644 --- a/docs/store.md +++ b/docs/store.md @@ -17,7 +17,7 @@ ```sql CREATE TABLE session_store ( id SERIAL PRIMARY KEY, - user_id TEXT, + user_id TEXT NOT NULL DEFAULT '', session_id TEXT NOT NULL, scope TEXT NOT NULL, key TEXT NOT NULL, @@ -27,6 +27,8 @@ ); ``` +**Note on `user_id`:** `NULL` is normalized to an empty string before every operation. This prevents duplicate rows for anonymous sessions because PostgreSQL treats `NULL` as distinct in unique constraints. + ## API (`navi.store.KvStore`) ```python diff --git a/docs/tools.md b/docs/tools.md index dc6e936..530847a 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -109,6 +109,12 @@ --- +## Middleware hooks + +Tools support `before_execute` / `after_execute` middleware hooks registered via `ToolRegistry.add_middleware()`. The built-in `LoggingMiddleware` logs every tool call with duration and result summary. + +Use middleware for cross-cutting concerns: metrics, rate limiting, authorization, audit logging. + ## Hot-reload `reload_tools` tool calls `ToolRegistry.reload_user_tools(tools_dir)`: diff --git a/docs/websocket.md b/docs/websocket.md index 4a6a312..5db8e28 100644 --- a/docs/websocket.md +++ b/docs/websocket.md @@ -25,9 +25,12 @@ - `type` must be `"message"`. Other types return an error frame. - `content` is required and must be non-empty. -- `images`: optional list of base64-encoded images (data URIs accepted; the `data:...;base64,` prefix is stripped server-side). +- `images`: optional list of base64-encoded images (data URIs accepted; the `data:...;base64,` prefix is stripped server-side). **Limits:** max 10 images per message, 5 MB each (decoded). Excess images are rejected with a WebSocket error. - `files`: optional list of uploaded file references (appended to content as `[Uploaded files on disk: ...]`). +### Concurrent run guard +Only one agent run may be active per session at a time. If a second message arrives while a run is already in progress (either a WebSocket run or a headless recall), the server rejects it with a WebSocket error. The client should wait for `stream_end` before sending the next message. + --- ## Messages: server → client diff --git a/navi/store/__init__.py b/navi/store/__init__.py index 15447e9..4bca5a7 100644 --- a/navi/store/__init__.py +++ b/navi/store/__init__.py @@ -12,7 +12,7 @@ _DDL = """ CREATE TABLE IF NOT EXISTS session_store ( id SERIAL PRIMARY KEY, - user_id TEXT, + user_id TEXT NOT NULL DEFAULT '', session_id TEXT NOT NULL, scope TEXT NOT NULL, key TEXT NOT NULL, @@ -22,9 +22,14 @@ ); CREATE INDEX IF NOT EXISTS idx_session_store_lookup ON session_store (user_id, session_id, scope, key); +UPDATE session_store SET user_id = '' WHERE user_id IS NULL; """ +def _norm_uid(user_id: str | None) -> str: + return user_id or "" + + class KvStore: """Simple key-value persistence with user + session + scope scoping.""" @@ -45,15 +50,17 @@ return self._pool async def get(self, user_id: str | None, session_id: str, scope: str, key: str) -> str | None: + uid = _norm_uid(user_id) pool = await self._get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT value FROM session_store WHERE user_id = $1 AND session_id = $2 AND scope = $3 AND key = $4", - user_id, session_id, scope, key, + uid, session_id, scope, key, ) return row["value"] if row else None async def set(self, user_id: str | None, session_id: str, scope: str, key: str, value: str) -> None: + uid = _norm_uid(user_id) pool = await self._get_pool() async with pool.acquire() as conn: await conn.execute( @@ -63,30 +70,33 @@ ON CONFLICT (user_id, session_id, scope, key) DO UPDATE SET value = EXCLUDED.value, updated_at = now() """, - user_id, session_id, scope, key, value, + uid, session_id, scope, key, value, ) async def get_all(self, user_id: str | None, session_id: str, scope: str) -> dict[str, str]: + uid = _norm_uid(user_id) pool = await self._get_pool() async with pool.acquire() as conn: rows = await conn.fetch( "SELECT key, value FROM session_store WHERE user_id = $1 AND session_id = $2 AND scope = $3", - user_id, session_id, scope, + uid, session_id, scope, ) return {r["key"]: r["value"] for r in rows} async def delete(self, user_id: str | None, session_id: str, scope: str, key: str) -> None: + uid = _norm_uid(user_id) pool = await self._get_pool() async with pool.acquire() as conn: await conn.execute( "DELETE FROM session_store WHERE user_id = $1 AND session_id = $2 AND scope = $3 AND key = $4", - user_id, session_id, scope, key, + uid, session_id, scope, key, ) async def clear_scope(self, user_id: str | None, session_id: str, scope: str) -> None: + uid = _norm_uid(user_id) pool = await self._get_pool() async with pool.acquire() as conn: await conn.execute( "DELETE FROM session_store WHERE user_id = $1 AND session_id = $2 AND scope = $3", - user_id, session_id, scope, + uid, session_id, scope, ) diff --git a/navi/tools/filesystem.py b/navi/tools/filesystem.py index e915a28..bfcc2f9 100644 --- a/navi/tools/filesystem.py +++ b/navi/tools/filesystem.py @@ -7,6 +7,7 @@ import asyncio import difflib +import re import shutil import stat from datetime import datetime diff --git a/navi/tools/image_view.py b/navi/tools/image_view.py index 1cabb13..1ce1769 100644 --- a/navi/tools/image_view.py +++ b/navi/tools/image_view.py @@ -74,7 +74,7 @@ r.raise_for_status() mime = r.headers.get("content-type", "image/jpeg").split(";")[0].strip() if not mime.startswith("image/") or mime == "image/svg+xml": - raise ValueError(f"URL returned non-raster image content-type: {mime}") + raise ValueError(f"Unsupported image format: {mime} (SVG is not supported, use raster images only)") return r.content, mime async def _read_file(self, path_str: str) -> tuple[bytes, str]: @@ -82,7 +82,7 @@ if not path.exists(): raise FileNotFoundError(f"File not found: {path}") if path.suffix.lower() not in _SUPPORTED: - raise ValueError(f"Unsupported image format: {path.suffix}") + raise ValueError(f"Unsupported image format: {path.suffix} (SVG is not supported, use raster images only)") mime = mimetypes.guess_type(str(path))[0] or "image/jpeg" raw = await asyncio.to_thread(path.read_bytes) return raw, mime diff --git a/tests/unit/tools/test_filesystem.py b/tests/unit/tools/test_filesystem.py index 6755450..b8e8485 100644 --- a/tests/unit/tools/test_filesystem.py +++ b/tests/unit/tools/test_filesystem.py @@ -165,3 +165,107 @@ assert not result.success assert result.error == "old_not_unique" assert f.read_text() == "alpha beta beta" + + async def test_copy_file(self, tool, tmp_path): + src = tmp_path / "src.txt" + src.write_text("copy me") + dst = tmp_path / "dst.txt" + + result = await tool.execute({"action": "copy", "path": str(src), "destination": str(dst)}) + + assert result.success + assert dst.exists() + assert dst.read_text() == "copy me" + + async def test_copy_missing_source(self, tool, tmp_path): + src = tmp_path / "missing.txt" + dst = tmp_path / "dst.txt" + + result = await tool.execute({"action": "copy", "path": str(src), "destination": str(dst)}) + + assert not result.success + assert "not found" in result.output.lower() + + async def test_grep_file(self, tool, tmp_path): + f = tmp_path / "grep.txt" + f.write_text("apple\nbanana\napricot\n") + + result = await tool.execute({"action": "grep", "path": str(f), "pattern": "ap"}) + + assert result.success + assert "apple" in result.output + assert "apricot" in result.output + assert "banana" not in result.output + + async def test_grep_regex(self, tool, tmp_path): + f = tmp_path / "grep.txt" + f.write_text("cat 123\ndog 456\ncat 789\n") + + result = await tool.execute({"action": "grep", "path": str(f), "pattern": r"cat\s+\d+", "regex": True}) + + assert result.success + assert "cat 123" in result.output + assert "cat 789" in result.output + assert "dog 456" not in result.output + + async def test_grep_no_match(self, tool, tmp_path): + f = tmp_path / "grep.txt" + f.write_text("hello\nworld\n") + + result = await tool.execute({"action": "grep", "path": str(f), "pattern": "xyz"}) + + assert result.success + assert "No matches" in result.output + + async def test_grep_missing_pattern(self, tool, tmp_path): + f = tmp_path / "grep.txt" + f.write_text("hello\n") + + result = await tool.execute({"action": "grep", "path": str(f)}) + + assert not result.success + assert "pattern" in result.output.lower() + + async def test_diff_identical(self, tool, tmp_path): + a = tmp_path / "a.txt" + a.write_text("same\ncontent\n") + b = tmp_path / "b.txt" + b.write_text("same\ncontent\n") + + result = await tool.execute({"action": "diff", "path": str(a), "destination": str(b)}) + + assert result.success + assert "identical" in result.output.lower() + + async def test_diff_different(self, tool, tmp_path): + a = tmp_path / "a.txt" + a.write_text("line1\nline2\n") + b = tmp_path / "b.txt" + b.write_text("line1\nmodified\n") + + result = await tool.execute({"action": "diff", "path": str(a), "destination": str(b)}) + + assert result.success + assert "---" in result.output + assert "+++" in result.output + assert "modified" in result.output + + async def test_diff_missing_destination(self, tool, tmp_path): + a = tmp_path / "a.txt" + a.write_text("data") + + result = await tool.execute({"action": "diff", "path": str(a)}) + + assert not result.success + assert "destination" in result.output.lower() + + async def test_diff_directory_rejected(self, tool, tmp_path): + a = tmp_path / "a.txt" + a.write_text("data") + d = tmp_path / "dir" + d.mkdir() + + result = await tool.execute({"action": "diff", "path": str(a), "destination": str(d)}) + + assert not result.success + assert "directories" in result.output.lower()