diff --git a/CLAUDE.md b/CLAUDE.md index 593d854..5f9a806 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -9,13 +9,13 @@ Personal modular AI agent system. FastAPI backend + vanilla JS client. The agent is named Navi — female, loyal, uncensored personal assistant. Runs locally via Ollama. ## Stack -- **Backend**: FastAPI, uvicorn, aiosqlite, structlog, pydantic-settings +- **Backend**: FastAPI, uvicorn, asyncpg, structlog, pydantic-settings - **LLM**: Ollama (primary, cloud + local fallback), OpenAI-compatible backend also wired - **Current models**: `gemma4:31b-cloud` (primary, Ollama Cloud), `gemma4:26b-a4b-it-q4_K_M` (local fallback) - **Multi-server fallback**: `OLLAMA_BACKENDS_FILE=ollama_backends.json` — ordered list of servers; profiles use a model priority list; dead servers/models are blacklisted in-memory until restart - **Thinking**: `ollama_think: bool = True` — model reasoning is enabled and streamed to client - **Client**: Vanilla JS ES modules, marked.js + highlight.js via esm.sh CDN -- **DB**: SQLite via aiosqlite for persistent sessions +- **DB**: PostgreSQL (asyncpg) for persistent sessions, memory, and scheduler - **Run**: `.venv/bin/uvicorn navi.main:app --reload --reload-dir navi --port 8000` ## Key architecture @@ -81,8 +81,8 @@ `reload_user_tools()` drops all non-builtins and reloads from disk. Built-in tools with registry injection: `ReloadToolsTool`, `WriteToolTool`, `ListToolsTool`, `ToolManualTool`. -### Sessions (`navi/core/sqlite_session_store.py`) -Persistent SQLite sessions. `model_dump(mode='json')` required for datetime serialization. +### Sessions (`navi/core/pg_session_store.py`) +Persistent PostgreSQL sessions. `model_dump(mode='json')` required for datetime serialization. Session ID in URL hash for bookmarking. ### WebSocket protocol (`navi/api/websocket.py`) diff --git a/docs/store.md b/docs/store.md new file mode 100644 index 0000000..76a8969 --- /dev/null +++ b/docs/store.md @@ -0,0 +1,51 @@ +# Session KV Store + +PostgreSQL-backed key-value storage for session-scoped data that must survive server restarts. + +## Purpose + +Provides a simple API for tools to persist per-session state without inventing ad-hoc storage mechanisms. + +Current consumers: +- **todo** — task plans (`scope='todo'`) +- **scratchpad** — working notes (`scope='scratchpad'`) + +Future consumers: recall context, wizard state, user preferences per session. + +## Schema + +```sql +CREATE TABLE session_store ( + id SERIAL PRIMARY KEY, + user_id TEXT, + session_id TEXT NOT NULL, + scope TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL DEFAULT '', + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (user_id, session_id, scope, key) +); +``` + +## API (`navi.store.KvStore`) + +```python +from navi.store import KvStore + +store = KvStore("postgresql://...") + +await store.get(user_id, session_id, scope, key) # str | None +await store.set(user_id, session_id, scope, key, value) # None +await store.get_all(user_id, session_id, scope) # dict[str, str] +await store.delete(user_id, session_id, scope, key) # None +await store.clear_scope(user_id, session_id, scope) # None +``` + +- `user_id` — nullable for legacy single-user mode. +- `session_id` — the chat session UUID. +- `scope` — namespace: `todo`, `scratchpad`, or any custom string. +- `key` — arbitrary string identifier within the scope. + +## Wiring + +`KvStore` is created as a singleton in `navi/api/deps.py` (`get_kv_store()`) and injected into `build_default_registries()`, which passes it to `TodoTool` and `ScratchpadTool` on startup. diff --git a/docs/tools.md b/docs/tools.md index 4130e2b..bb793d4 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -12,12 +12,12 @@ |---|---|---| | `WebSearchTool` | `mcp:navi-web:web_search` | DuckDuckGo search | | `WebViewTool` | `mcp:navi-web:web_view` | Fetch and render a URL | -| `FilesystemTool` | `filesystem` | Read/write/list local files (path restrictions via config) | +| `FilesystemTool` | `filesystem` | Read/write/list/copy/grep/diff local files (path restrictions via config) | | `HttpRequestTool` | `mcp:navi-web:http_request` | Generic HTTP client (GET/POST/etc.) | | `CodeExecTool` | `code_exec` | Execute Python in a subprocess sandbox | | `TerminalTool` | `terminal` | Run shell commands (command allowlist via config) | -| `SshExecTool` | `ssh_exec` | SSH into remote hosts; connection pool keyed by session ID | -| `ImageViewTool` | `image_view` | Load image from path/URL → returns base64 for multimodal LLM | +| `SshExecTool` | `ssh_exec` | SSH exec and SCP file transfer; connection pool keyed by session ID | +| `ImageViewTool` | `image_view` | Load image from path/URL → resize to 1024px, convert to JPEG, return base64 for multimodal LLM | | `TodoTool` | `todo` | Per-session task checklist (set/update/read) | | `ScratchpadTool` | `scratchpad` | Per-session named working notes (write/append/read/clear) | | `ReloadToolsTool` | `reload_tools` | Hot-reload user tools without server restart | @@ -134,7 +134,7 @@ ## Scratchpad and Todo -Both are per-session, stored in-memory keyed by `current_session_id`. +Both are per-session, backed by the PostgreSQL KV-store (`session_store` table) and survive server restarts. **Scratchpad** — named sections for working notes within a task. Operations: `write`, `append`, `read`, `clear`. Subagents get isolated scratchpads (unique UUID-based session ID in `run_ephemeral()`). diff --git a/navi/api/deps.py b/navi/api/deps.py index 4020b70..e2ad61d 100644 --- a/navi/api/deps.py +++ b/navi/api/deps.py @@ -25,6 +25,7 @@ require_user, ) from navi.memory import MemoryStore +from navi.store import KvStore from navi.workers import Worker, build_default_workers from navi.mcp import McpManager, load_mcp_servers from navi.mcp.tools import McpTool @@ -46,6 +47,7 @@ _registries: tuple[ToolRegistry, ProfileRegistry, BackendRegistry, ContextProviderRegistry] | None = None _mcp_manager: McpManager | None = None _scheduler: RecallScheduler | None = None +_kv_store: KvStore | None = None def get_memory_store() -> MemoryStore: @@ -55,6 +57,19 @@ return _memory_store +def _make_kv_store() -> KvStore: + if not settings.database_url: + raise RuntimeError("DATABASE_URL is required. SQLite support has been removed.") + return KvStore(settings.database_url) + + +def get_kv_store() -> KvStore: + global _kv_store + if _kv_store is None: + _kv_store = _make_kv_store() + return _kv_store + + def get_scheduler() -> RecallScheduler: global _scheduler if _scheduler is None: @@ -71,6 +86,7 @@ memory_store=get_memory_store(), session_store=get_session_store(), scheduler=get_scheduler(), + kv_store=get_kv_store(), ) # Wire embedding backend into memory store for vector search. # Uses a dedicated Ollama endpoint when configured, otherwise falls back diff --git a/navi/core/agent.py b/navi/core/agent.py index c7c54c3..46a6f67 100644 --- a/navi/core/agent.py +++ b/navi/core/agent.py @@ -166,26 +166,26 @@ _SUBAGENT_THINKING_STALL_CHARS = 12_000 -def _todo_status_snapshot(session_id: str) -> frozenset[tuple[str, str]]: +async def _todo_status_snapshot(session_id: str) -> frozenset[tuple[str, str]]: """Return a frozenset of (task_text, status) for the current session's todo list. Used by the anti-stall detector to compare todo state before and after an iteration — any status change means the model made real progress. """ from navi.tools.todo import get_task_snapshot - return get_task_snapshot(session_id) + return await get_task_snapshot(session_id) -def _todo_failed_steps(session_id: str) -> frozenset[tuple[int, str]]: +async def _todo_failed_steps(session_id: str) -> frozenset[tuple[int, str]]: """Return a frozenset of (1-based index, task_text) for steps currently marked failed.""" from navi.tools.todo import get_failed_steps - return get_failed_steps(session_id) + return await get_failed_steps(session_id) -def _todo_progress_message(session_id: str, *, first_iteration: bool = False) -> "Message | None": +async def _todo_progress_message(session_id: str, *, first_iteration: bool = False) -> "Message | None": """Build a compact system reminder with current todo state and update discipline.""" from navi.tools.todo import get_progress_message - return get_progress_message(session_id, first_iteration=first_iteration) + return await get_progress_message(session_id, first_iteration=first_iteration) class Agent: @@ -804,14 +804,14 @@ and iteration > 0 and iteration % profile.goal_anchoring_interval == 0 ): - built_ctx.append(self._ctx_builder._build_goal_anchor(session_id, user_message)) + built_ctx.append(await self._ctx_builder._build_goal_anchor(session_id, user_message)) - todo_msg = _todo_progress_message(session_id, first_iteration=(iteration == 0)) + todo_msg = await _todo_progress_message(session_id, first_iteration=(iteration == 0)) if todo_msg: built_ctx.append(todo_msg) # Snapshot todo state before this iteration (for stall detection after) - _todo_snapshot_before = _todo_status_snapshot(session_id) + _todo_snapshot_before = await _todo_status_snapshot(session_id) # Adaptive re-plan: inject queued re-plan message from previous iteration if profile.adaptive_replan_enabled and _replan_msg: @@ -1025,7 +1025,7 @@ # Update anti-stall counters after all tools in this iteration ran. if profile.anti_stall_enabled: # Todo progress signal - if _todo_status_snapshot(session_id) != _todo_snapshot_before: + if (await _todo_status_snapshot(session_id)) != _todo_snapshot_before: _stall_no_todo = 0 else: _stall_no_todo += 1 @@ -1043,7 +1043,7 @@ # Adaptive re-plan: detect steps that were newly marked failed this iteration. if profile.adaptive_replan_enabled: - current_failed = _todo_failed_steps(session_id) + current_failed = await _todo_failed_steps(session_id) new_failures = current_failed - _known_failed _known_failed = current_failed if new_failures: diff --git a/navi/core/context_builder.py b/navi/core/context_builder.py index 6ac4134..4b5d962 100644 --- a/navi/core/context_builder.py +++ b/navi/core/context_builder.py @@ -20,11 +20,11 @@ from navi.profiles.base import AgentProfile -def render_todo_lines(session_id: str) -> list[str]: +async def render_todo_lines(session_id: str) -> list[str]: """Return a list of formatted todo lines for goal anchoring.""" try: from navi.tools.todo import render_todo_lines as _rtl - return _rtl(session_id) + return await _rtl(session_id) except Exception: return [] @@ -156,12 +156,12 @@ pass return out - def _build_goal_anchor(self, session_id: str, user_message: str) -> Message: + async def _build_goal_anchor(self, session_id: str, user_message: str) -> Message: lines = [ "[Goal anchor]", f"Original request: {user_message}", ] - todo_lines = render_todo_lines(session_id) + todo_lines = await render_todo_lines(session_id) if todo_lines: lines.append("Current todo:") lines.extend(todo_lines) diff --git a/navi/core/planning.py b/navi/core/planning.py index 963ae91..41dc574 100644 --- a/navi/core/planning.py +++ b/navi/core/planning.py @@ -422,7 +422,7 @@ from navi.tools.todo import set_tasks from navi.tools._internal.base import current_session_id as _sid_var _sid = _sid_var.get() or "__default__" - set_tasks(_sid, _todo_steps) + await set_tasks(_sid, _todo_steps) log.debug("agent.todo_auto_populated", steps=len(_todo_steps), session=_sid) except Exception: log.warning("agent.todo_auto_populate_failed", exc_info=True) diff --git a/navi/core/registry.py b/navi/core/registry.py index c1de963..8a6bc85 100644 --- a/navi/core/registry.py +++ b/navi/core/registry.py @@ -165,6 +165,7 @@ memory_store=None, session_store=None, scheduler=None, + kv_store=None, ) -> tuple[ToolRegistry, ProfileRegistry, BackendRegistry, ContextProviderRegistry]: """Build and populate registries with all built-in components.""" from navi.core.ai_helper import AIHelper @@ -198,7 +199,8 @@ builtins = [FilesystemTool(ai_helper=ai_helper), CodeExecTool(), TerminalTool(), SshExecTool(), ImageViewTool(), ShareFileTool(), ContentPublishTool(), - TodoTool(), ScratchpadTool(), ReflectTool(ai_helper=ai_helper), + TodoTool(kv_store=kv_store), ScratchpadTool(kv_store=kv_store), + ReflectTool(ai_helper=ai_helper), reload_tool, list_tool, manual_tool, mcp_status_tool, create_mcp_server_tool, test_mcp_tool_tool, schedule_recall_tool, manage_recall_tool] diff --git a/navi/memory/_facts.py b/navi/memory/_facts.py index 2e71e6c..ee85b67 100644 --- a/navi/memory/_facts.py +++ b/navi/memory/_facts.py @@ -294,6 +294,21 @@ return await conn.fetchval(q, *params) or 0 + async def get_categories(self, user_id: str | None = None) -> list[str]: + pool = await self._get_pool() + async with pool.acquire() as conn: + if user_id is None: + rows = await conn.fetch( + "SELECT DISTINCT category FROM memory_facts WHERE user_id IS NULL ORDER BY category" + ) + else: + rows = await conn.fetch( + "SELECT DISTINCT category FROM memory_facts WHERE user_id = $1 ORDER BY category", + user_id, + ) + return [r["category"] for r in rows] + + def _row_to_dict(row: asyncpg.Record) -> dict: val = row["updated_at"] updated_at = val.isoformat() if hasattr(val, "isoformat") else str(val) diff --git a/navi/store/__init__.py b/navi/store/__init__.py new file mode 100644 index 0000000..15447e9 --- /dev/null +++ b/navi/store/__init__.py @@ -0,0 +1,92 @@ +"""KV store backed by PostgreSQL for session-scoped data. + +Used by todo, scratchpad, and any future per-session state that must +survive server restarts. +""" + +import asyncio +from typing import Any + +import asyncpg + +_DDL = """ +CREATE TABLE IF NOT EXISTS session_store ( + id SERIAL PRIMARY KEY, + user_id TEXT, + session_id TEXT NOT NULL, + scope TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL DEFAULT '', + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (user_id, session_id, scope, key) +); +CREATE INDEX IF NOT EXISTS idx_session_store_lookup + ON session_store (user_id, session_id, scope, key); +""" + + +class KvStore: + """Simple key-value persistence with user + session + scope scoping.""" + + def __init__(self, dsn: str) -> None: + self._dsn = dsn + self._pool: asyncpg.Pool | None = None + self._lock = asyncio.Lock() + + async def _get_pool(self) -> asyncpg.Pool: + if self._pool is not None: + return self._pool + async with self._lock: + if self._pool is None: + pool = await asyncpg.create_pool(self._dsn) + async with pool.acquire() as conn: + await conn.execute(_DDL) + self._pool = pool + return self._pool + + async def get(self, user_id: str | None, session_id: str, scope: str, key: str) -> str | None: + pool = await self._get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT value FROM session_store WHERE user_id = $1 AND session_id = $2 AND scope = $3 AND key = $4", + user_id, session_id, scope, key, + ) + return row["value"] if row else None + + async def set(self, user_id: str | None, session_id: str, scope: str, key: str, value: str) -> None: + pool = await self._get_pool() + async with pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO session_store (user_id, session_id, scope, key, value, updated_at) + VALUES ($1, $2, $3, $4, $5, now()) + ON CONFLICT (user_id, session_id, scope, key) + DO UPDATE SET value = EXCLUDED.value, updated_at = now() + """, + user_id, session_id, scope, key, value, + ) + + async def get_all(self, user_id: str | None, session_id: str, scope: str) -> dict[str, str]: + pool = await self._get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + "SELECT key, value FROM session_store WHERE user_id = $1 AND session_id = $2 AND scope = $3", + user_id, session_id, scope, + ) + return {r["key"]: r["value"] for r in rows} + + async def delete(self, user_id: str | None, session_id: str, scope: str, key: str) -> None: + pool = await self._get_pool() + async with pool.acquire() as conn: + await conn.execute( + "DELETE FROM session_store WHERE user_id = $1 AND session_id = $2 AND scope = $3 AND key = $4", + user_id, session_id, scope, key, + ) + + async def clear_scope(self, user_id: str | None, session_id: str, scope: str) -> None: + pool = await self._get_pool() + async with pool.acquire() as conn: + await conn.execute( + "DELETE FROM session_store WHERE user_id = $1 AND session_id = $2 AND scope = $3", + user_id, session_id, scope, + ) diff --git a/navi/tools/code_exec.py b/navi/tools/code_exec.py index 2f3af74..b3499fc 100644 --- a/navi/tools/code_exec.py +++ b/navi/tools/code_exec.py @@ -57,11 +57,9 @@ "type": "string", "description": "Python code to execute", }, - "language": { + "working_dir": { "type": "string", - "enum": ["python"], - "description": "Programming language (currently only python)", - "default": "python", + "description": "Working directory for the script (optional).", }, }, "required": ["code"], @@ -69,19 +67,11 @@ async def execute(self, params: dict) -> ToolResult: code = params["code"] - language = params.get("language", "python") - - if language != "python": - return ToolResult( - success=False, - output=f"Language '{language}' is not supported.", - error="unsupported_language", - ) role = current_user_role.get() user_id = current_user_id.get(None) - cwd = _resolve_working_dir(None) + cwd = _resolve_working_dir(params.get("working_dir")) if user_id and role != "admin": # Write temp file inside the sandbox so file I/O in user code diff --git a/navi/tools/filesystem.py b/navi/tools/filesystem.py index 629e110..99d7668 100644 --- a/navi/tools/filesystem.py +++ b/navi/tools/filesystem.py @@ -235,33 +235,14 @@ name = "filesystem" description = ( "Operate on the local filesystem. " - "ALWAYS prefer AI actions over manual read+write — they produce more accurate results " - "and handle files of any size automatically:\n" - " • query — use INSTEAD of read when you need to extract or look up information. " - "Examples: 'what arguments does function X take?', 'on which line is class Y defined?', " - "'does this config contain key Z?', 'list all TODO comments'. " - "Pass the question in 'question'.\n" - "Four editing modes — choose the cheapest one that fits your situation:\n" - " • write — overwrite the ENTIRE file with new content. Use only for new files or total rewrites.\n" - " • edit — deterministic exact-text replacement. You must know the precise old text (must occur exactly once). " - "Pass 'old' and 'new'. Fastest and safest for small tweaks.\n" - " • edit_lines — deterministic line-based edit. You must know exact line numbers. " - "Pass 'operations' array with replace/delete/insert ops. Fastest for bulk structural changes.\n" - " • smart_edit — AI-powered semantic edit. Use when you know the intent but not the exact replacement. " - "Examples: 'rename function foo to bar', 'add a docstring to method X', " - "'remove all commented-out code', 'change timeout from 30 to 60'. " - "Pass the instruction in 'instruction'. Consumes extra tokens.\n" - "Standard actions (use only when AI actions are not applicable): " - "read — raw file text (offset+limit for large files, numbered=true for line numbers); " - "append — add text to end; " - "list — directory contents with sizes; " - "find — search files by glob pattern downward; " - "find_up — walk up the directory tree to find a file by name (pattern param); returns its path or 'not found'; " - "info — size, line count, dates, permissions; " - "move — rename or move; " - "delete — remove file or directory tree; " - "exists — check if path exists. " - "Tip: call info before reading an unknown file to check its size first." + "Decision tree — pick the cheapest option for your task:\n" + "1. Need to extract info from a file — use 'query' (pass 'question').\n" + "2. Small exact edit — use 'edit' with 'old' and 'new' text, or 'edit_lines' with line numbers.\n" + "3. Semantic edit (intent known, exact text unknown) — use 'smart_edit' (pass 'instruction').\n" + "4. Create or fully rewrite — use 'write' (pass 'content').\n" + "5. Everything else — standard actions: read, append, list, find, find_up, grep, diff, " + "info, copy, move, delete, exists, mkdir.\n" + "Tip: call 'info' before reading an unknown file to check size." ) parameters = { "type": "object", @@ -270,8 +251,8 @@ "type": "string", "enum": [ "read", "write", "append", "edit", "edit_lines", "list", "find", "find_up", - "info", "move", "delete", "exists", "mkdir", - "query", "smart_edit", + "info", "move", "copy", "delete", "exists", "mkdir", + "query", "smart_edit", "grep", "diff", ], "description": "Operation to perform.", }, @@ -335,6 +316,10 @@ "type": "boolean", "description": "Full recursive directory tree (for list, default false).", }, + "regex": { + "type": "boolean", + "description": "Use regex matching for grep (default false = literal substring search).", + }, "question": { "type": "string", "description": ( @@ -398,11 +383,14 @@ case "find_up": return await asyncio.to_thread(self._find_up, path, params) case "info": return await asyncio.to_thread(self._info, path) case "move": return await asyncio.to_thread(self._move, path, params) + case "copy": return await asyncio.to_thread(self._copy, path, params) case "delete": return await asyncio.to_thread(self._delete, path) case "exists": return ToolResult(success=True, output="true" if path.exists() else "false") case "mkdir": return await asyncio.to_thread(self._mkdir, path) case "query": return await self._query(path, params) case "smart_edit": return await self._smart_edit(path, params) + case "grep": return await asyncio.to_thread(self._grep, path, params) + case "diff": return await asyncio.to_thread(self._diff, path, params) case _: return ToolResult(success=False, output=f"Unknown action: {action}", error="invalid_action") @@ -694,6 +682,102 @@ path.unlink() return ToolResult(success=True, output=f"Deleted: {path}") + def _copy(self, path: Path, params: dict) -> ToolResult: + dest_raw = params.get("destination") + if not dest_raw: + return ToolResult(success=False, output="'destination' is required for copy", error="missing_destination") + dest = _check_path(dest_raw) + if dest is None: + return ToolResult(success=False, output=f"Access denied: destination '{dest_raw}' outside allowed paths.", error="access_denied") + if not path.exists(): + return ToolResult(success=False, output=f"Not found: {path}", error="not_found") + dest.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(str(path), str(dest)) + return ToolResult(success=True, output=f"Copied: {path} → {dest}") + + def _grep(self, path: Path, params: dict) -> ToolResult: + pattern = params.get("pattern", "").strip() + if not pattern: + return ToolResult(success=False, output="'pattern' is required for grep", error="missing_pattern") + use_regex = params.get("regex", False) + glob_filter = params.get("glob", "") + + try: + if use_regex: + compiled = re.compile(pattern, re.IGNORECASE) + else: + compiled = re.compile(re.escape(pattern), re.IGNORECASE) + except re.error as e: + return ToolResult(success=False, output=f"Invalid pattern: {e}", error="invalid_pattern") + + max_results = 200 + matches: list[str] = [] + files_searched = 0 + + if path.is_file(): + files_to_search = [path] + elif path.is_dir(): + if glob_filter: + files_to_search = list(path.rglob(glob_filter)) + else: + files_to_search = list(path.rglob("*")) + files_to_search = [f for f in files_to_search if f.is_file()] + else: + return ToolResult(success=False, output=f"Not found: {path}", error="not_found") + + for fpath in files_to_search: + if len(matches) >= max_results: + break + files_searched += 1 + try: + text = fpath.read_text(encoding="utf-8", errors="replace") + for i, line in enumerate(text.splitlines(), 1): + if compiled.search(line): + rel = fpath.relative_to(path) if fpath != path else fpath.name + matches.append(f"{rel}:{i}: {line}") + if len(matches) >= max_results: + break + except Exception: + continue + + if not matches: + return ToolResult(success=True, output=f"No matches for '{pattern}' in {path}") + + note = f" ⚠ showing first {max_results}" if len(matches) == max_results else "" + header = f"[{len(matches)} matches for '{pattern}' in {path} ({files_searched} files searched){note}]\n" + return ToolResult(success=True, output=header + "\n".join(matches)) + + def _diff(self, path: Path, params: dict) -> ToolResult: + dest_raw = params.get("destination") + if not dest_raw: + return ToolResult(success=False, output="'destination' is required for diff (path to second file)", error="missing_destination") + dest = _check_path(dest_raw) + if dest is None: + return ToolResult(success=False, output=f"Access denied: destination '{dest_raw}' outside allowed paths.", error="access_denied") + if not path.exists(): + return ToolResult(success=False, output=f"Not found: {path}", error="not_found") + if not dest.exists(): + return ToolResult(success=False, output=f"Not found: {dest}", error="not_found") + if path.is_dir() or dest.is_dir(): + return ToolResult(success=False, output="diff works on files, not directories.", error="is_directory") + + try: + a_lines = path.read_text(encoding="utf-8", errors="replace").splitlines() + b_lines = dest.read_text(encoding="utf-8", errors="replace").splitlines() + except Exception as e: + return ToolResult(success=False, output=f"Read error: {e}", error=str(e)) + + diff = list(difflib.unified_diff( + [l + "\n" for l in a_lines], + [l + "\n" for l in b_lines], + fromfile=f"a/{path.name}", + tofile=f"b/{dest.name}", + lineterm="", + )) + if not diff: + return ToolResult(success=True, output="Files are identical.") + return ToolResult(success=True, output="\n".join(diff)) + # ── AI action handlers ──────────────────────────────────────────────────── def _require_ai(self) -> ToolResult | None: diff --git a/navi/tools/image_view.py b/navi/tools/image_view.py index fcfd134..1cabb13 100644 --- a/navi/tools/image_view.py +++ b/navi/tools/image_view.py @@ -1,20 +1,27 @@ """Image view tool — load an image from a file path or URL for the LLM to analyse. -The image is returned as base64 and injected into the conversation so the LLM -can actually see it (not just read a text description of it). +Images are resized to max 1024 px on the longest side and converted to JPEG +(~85 quality) before base64 encoding to keep context size reasonable. + +The processed image is returned as base64 and injected into the conversation +so the LLM can actually see it (not just read a text description of it). """ import asyncio import base64 +import io import mimetypes from pathlib import Path import httpx +from PIL import Image from ._internal.base import Tool, ToolResult _TIMEOUT = 30 _SUPPORTED = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"} +_MAX_SIZE = 1024 +_JPEG_QUALITY = 85 class ImageViewTool(Tool): @@ -47,8 +54,9 @@ else: raw, mime = await self._read_file(source) - b64 = base64.b64encode(raw).decode() - size_kb = len(raw) // 1024 + processed, mime = await asyncio.to_thread(self._preprocess, raw) + b64 = base64.b64encode(processed).decode() + size_kb = len(processed) // 1024 return ToolResult( success=True, output=( @@ -65,6 +73,8 @@ r = await client.get(url) r.raise_for_status() mime = r.headers.get("content-type", "image/jpeg").split(";")[0].strip() + if not mime.startswith("image/") or mime == "image/svg+xml": + raise ValueError(f"URL returned non-raster image content-type: {mime}") return r.content, mime async def _read_file(self, path_str: str) -> tuple[bytes, str]: @@ -76,3 +86,17 @@ mime = mimetypes.guess_type(str(path))[0] or "image/jpeg" raw = await asyncio.to_thread(path.read_bytes) return raw, mime + + @staticmethod + def _preprocess(raw: bytes) -> tuple[bytes, str]: + """Resize to _MAX_SIZE on longest side, convert to JPEG, return (bytes, mime).""" + img = Image.open(io.BytesIO(raw)) + img = img.convert("RGB") + w, h = img.size + if w > _MAX_SIZE or h > _MAX_SIZE: + ratio = _MAX_SIZE / max(w, h) + new_size = (int(w * ratio), int(h * ratio)) + img = img.resize(new_size, Image.LANCZOS) + buf = io.BytesIO() + img.save(buf, format="JPEG", quality=_JPEG_QUALITY, optimize=True) + return buf.getvalue(), "image/jpeg" diff --git a/navi/tools/memory.py b/navi/tools/memory.py index b390e10..244c29d 100644 --- a/navi/tools/memory.py +++ b/navi/tools/memory.py @@ -190,17 +190,10 @@ async def _list(self) -> ToolResult: user_id = current_user_id.get(None) - facts = await self._store.get_all_facts(user_id=user_id) - if not facts: + categories = await self._store.get_categories(user_id=user_id) + if not categories: return ToolResult(success=True, output="Memory is empty.") - - lines = [] - for f in facts: - prov = "" - if f.get("source"): - prov += f" (src: {f['source']}" - if f.get("confidence"): - prov += f", conf: {f['confidence']}" - prov += ")" - lines.append(f"[{f['category']}] {f['key']}: {f['value']}{prov}") - return ToolResult(success=True, output=f"{len(facts)} fact(s) in memory:\n" + "\n".join(lines)) + return ToolResult( + success=True, + output="Categories in memory:\n" + "\n".join(f" • {c}" for c in categories), + ) diff --git a/navi/tools/scratchpad.py b/navi/tools/scratchpad.py index ee9613d..8b1f02b 100644 --- a/navi/tools/scratchpad.py +++ b/navi/tools/scratchpad.py @@ -1,15 +1,35 @@ -"""Session-scoped scratchpad for capturing working notes during task execution.""" +"""Session-scoped scratchpad for capturing working notes during task execution — backed by PostgreSQL KV store.""" from __future__ import annotations -from ._internal.base import Tool, ToolResult, current_session_id +from navi.tools._internal.base import Tool, ToolResult, current_session_id, current_user_id -# session_id → {section_name: content} -_pads: dict[str, dict[str, str]] = {} +# Global KV store reference — injected at startup by registry.py +_kv_store = None -def get_section(session_id: str, section: str) -> str: +def set_kv_store(kv) -> None: + """Inject the shared KvStore instance (called once at startup).""" + global _kv_store + _kv_store = kv + + +def _sid() -> str: + return current_session_id.get() or "__default__" + + +def _uid() -> str | None: + return current_user_id.get(None) + + +async def get_section(session_id: str, section: str) -> str: """Read one scratchpad section for the given session. Returns '' if absent.""" - return _pads.get(session_id, {}).get(section, "") + if _kv_store is None: + return "" + try: + val = await _kv_store.get(None, session_id, "scratchpad", section) + return val or "" + except Exception: + return "" class ScratchpadTool(Tool): @@ -54,49 +74,61 @@ "required": ["op"], } + def __init__(self, kv_store=None) -> None: + if kv_store is not None: + set_kv_store(kv_store) + async def execute(self, params: dict) -> ToolResult: - sid = current_session_id.get() or "__default__" + sid = _sid() op = params.get("op") section: str | None = params.get("section") or None content: str = params.get("content", "") - pad = _pads.setdefault(sid, {}) - if op == "write": if not content: return ToolResult(success=False, output="", error="'content' is required for 'write'") key = section or "main" - pad[key] = content + if _kv_store is not None: + await _kv_store.set(_uid(), sid, "scratchpad", key, content) return ToolResult(success=True, output=f"[{key}] written ({len(content)} chars).") if op == "append": if not content: return ToolResult(success=False, output="", error="'content' is required for 'append'") key = section or "main" - existing = pad.get(key, "") - pad[key] = (existing + "\n" + content).lstrip("\n") if existing else content - return ToolResult(success=True, output=f"[{key}] updated ({len(pad[key])} chars total).") + if _kv_store is not None: + existing = await _kv_store.get(_uid(), sid, "scratchpad", key) or "" + new = (existing + "\n" + content).lstrip("\n") if existing else content + await _kv_store.set(_uid(), sid, "scratchpad", key, new) + return ToolResult(success=True, output=f"[{key}] updated ({len(new)} chars total).") + return ToolResult(success=True, output=f"[{key}] updated.") if op == "read": + if _kv_store is None: + return ToolResult(success=True, output="Scratchpad is empty.") if section is not None: - text = pad.get(section) + text = await _kv_store.get(_uid(), sid, "scratchpad", section) if not text: return ToolResult(success=True, output=f"[{section}] is empty.") return ToolResult(success=True, output=f"[{section}]:\n{text}") # No section → read all - if not pad: + all_data = await _kv_store.get_all(_uid(), sid, "scratchpad") + if not all_data: return ToolResult(success=True, output="Scratchpad is empty.") - parts = [f"[{k}]:\n{v}" for k, v in pad.items()] + parts = [f"[{k}]:\n{v}" for k, v in all_data.items()] return ToolResult(success=True, output="\n\n".join(parts)) if op == "clear": + if _kv_store is None: + return ToolResult(success=True, output="Scratchpad cleared.") if section is not None: - removed = pad.pop(section, None) + existing = await _kv_store.get(_uid(), sid, "scratchpad", section) + await _kv_store.delete(_uid(), sid, "scratchpad", section) return ToolResult( success=True, - output=f"[{section}] cleared." if removed else f"[{section}] was already empty.", + output=f"[{section}] cleared." if existing else f"[{section}] was already empty.", ) - pad.clear() + await _kv_store.clear_scope(_uid(), sid, "scratchpad") return ToolResult(success=True, output="Scratchpad cleared.") return ToolResult(success=False, output="", error=f"Unknown op: {op!r}") diff --git a/navi/tools/spawn_agent.py b/navi/tools/spawn_agent.py index 24fc492..3e3f626 100644 --- a/navi/tools/spawn_agent.py +++ b/navi/tools/spawn_agent.py @@ -126,7 +126,7 @@ # Read parent scratchpad context_transfer section and pass it to the sub-agent. parent_sid = current_session_id.get() - context_transfer = get_section(parent_sid, "context_transfer") if parent_sid else "" + context_transfer = (await get_section(parent_sid, "context_transfer")) if parent_sid else "" tool_source = ( selected_profile.subagent_tools diff --git a/navi/tools/ssh_exec.py b/navi/tools/ssh_exec.py index 02ab69a..ca7fd74 100644 --- a/navi/tools/ssh_exec.py +++ b/navi/tools/ssh_exec.py @@ -137,9 +137,14 @@ parameters = { "type": "object", "properties": { + "action": { + "type": "string", + "enum": ["exec", "scp"], + "description": "Action to perform: exec (run command, default) or scp (transfer file).", + }, "command": { "type": "string", - "description": "Shell command to run on the remote host", + "description": "Shell command to run on the remote host (required for action=exec).", }, "host": { "type": "string", @@ -165,16 +170,29 @@ "type": "string", "description": "Named connection from ssh_hosts.json — shortcut that provides host/user/creds automatically", }, + "local_path": { + "type": "string", + "description": "Local file path (required for action=scp).", + }, + "remote_path": { + "type": "string", + "description": "Remote file path (required for action=scp).", + }, + "direction": { + "type": "string", + "enum": ["upload", "download"], + "description": "Transfer direction for scp: upload (local→remote) or download (remote→local).", + }, "timeout": { "type": "integer", "description": f"Timeout in seconds (default {_TIMEOUT})", }, }, - "required": ["command"], + "required": [], } async def execute(self, params: dict) -> ToolResult: - command = params["command"].strip() + action = params.get("action", "exec") timeout = int(params.get("timeout") or _TIMEOUT) connect_kwargs = self._resolve(params) @@ -185,6 +203,13 @@ ) return ToolResult(success=False, output=msg, error=msg) + if action == "scp": + return await self._run_scp(params, connect_kwargs, timeout) + + command = (params.get("command") or "").strip() + if not command: + return ToolResult(success=False, output="'command' is required for exec action.", error="missing_command") + session_id = current_session_id.get() host = connect_kwargs["host"] port = int(connect_kwargs.get("port", 22)) @@ -247,6 +272,39 @@ # Should not be reached return ToolResult(success=False, output="SSH: unexpected retry exhaustion", error="retry_failed") + async def _run_scp(self, params: dict, connect_kwargs: dict, timeout: int) -> ToolResult: + local_path = (params.get("local_path") or "").strip() + remote_path = (params.get("remote_path") or "").strip() + direction = params.get("direction", "upload") + + if not local_path: + return ToolResult(success=False, output="'local_path' is required for scp.", error="missing_local_path") + if not remote_path: + return ToolResult(success=False, output="'remote_path' is required for scp.", error="missing_remote_path") + + host = connect_kwargs["host"] + port = int(connect_kwargs.get("port", 22)) + username = connect_kwargs.get("username", os.environ.get("USER", "root")) + ssh_target = (host, remote_path) if port == 22 else (host, port, remote_path) + + try: + if direction == "upload": + await asyncio.wait_for( + asyncssh.scp(local_path, ssh_target, **connect_kwargs), + timeout=timeout, + ) + return ToolResult(success=True, output=f"Uploaded: {local_path} → {username}@{host}:{remote_path}") + else: + await asyncio.wait_for( + asyncssh.scp(ssh_target, local_path, **connect_kwargs), + timeout=timeout, + ) + return ToolResult(success=True, output=f"Downloaded: {username}@{host}:{remote_path} → {local_path}") + except (TimeoutError, asyncio.TimeoutError): + return ToolResult(success=False, output=f"SCP timed out after {timeout}s", error="timeout") + except Exception as e: + return ToolResult(success=False, output=f"SCP error: {e}", error=str(e)) + def _resolve(self, params: dict) -> dict | None: # Named connection from ssh_hosts.json takes precedence connection = params.get("connection", "").strip() diff --git a/navi/tools/todo.py b/navi/tools/todo.py index e2d04b2..02abe2b 100644 --- a/navi/tools/todo.py +++ b/navi/tools/todo.py @@ -1,9 +1,10 @@ -"""Session-scoped task plan manager.""" +"""Session-scoped task plan manager — backed by PostgreSQL KV store.""" from __future__ import annotations +import json from dataclasses import dataclass, field -from ._internal.base import Tool, ToolResult, current_session_id +from navi.tools._internal.base import Tool, ToolResult, current_session_id, current_user_id _STATUS_ICON: dict[str, str] = { "pending": "○", @@ -20,8 +21,42 @@ validation: str = "" # how the result was verified (required for done, encouraged for failed) -# In-memory store: session_id → task list (ephemeral, lives with the server process) -_plans: dict[str, list[_Task]] = {} +# Global KV store reference — injected at startup by registry.py +_kv_store = None + + +def set_kv_store(kv) -> None: + """Inject the shared KvStore instance (called once at startup).""" + global _kv_store + _kv_store = kv + + +def _sid() -> str: + return current_session_id.get() or "__default__" + + +def _uid() -> str | None: + return current_user_id.get(None) + + +async def _load_tasks(sid: str) -> list[_Task]: + if _kv_store is None: + return [] + raw = await _kv_store.get(_uid(), sid, "todo", "tasks") + if not raw: + return [] + try: + data = json.loads(raw) + return [_Task(**item) for item in data] + except Exception: + return [] + + +async def _save_tasks(sid: str, tasks: list[_Task]) -> None: + if _kv_store is None: + return + data = [{"text": t.text, "status": t.status, "validation": t.validation} for t in tasks] + await _kv_store.set(_uid(), sid, "todo", "tasks", json.dumps(data)) class TodoTool(Tool): @@ -77,24 +112,30 @@ "required": ["op"], } + def __init__(self, kv_store=None) -> None: + if kv_store is not None: + set_kv_store(kv_store) + async def execute(self, params: dict) -> ToolResult: - sid = current_session_id.get() or "__default__" + sid = _sid() op = params.get("op") if op == "set": raw = params.get("tasks") or [] if not raw: return ToolResult(success=False, output="", error="'tasks' list is required for 'set'") - _plans[sid] = [_Task(text=str(t)) for t in raw] - return ToolResult(success=True, output=self._render(sid)) + tasks = [_Task(text=str(t)) for t in raw] + await _save_tasks(sid, tasks) + return ToolResult(success=True, output=self._render(sid, tasks)) if op == "view": - if sid not in _plans or not _plans[sid]: + tasks = await _load_tasks(sid) + if not tasks: return ToolResult(success=True, output="No plan set for this session.") - return ToolResult(success=True, output=self._render(sid)) + return ToolResult(success=True, output=self._render(sid, tasks)) if op == "update": - tasks = _plans.get(sid) + tasks = await _load_tasks(sid) if not tasks: return ToolResult(success=False, output="", error="No plan set. Use 'set' first.") idx = params.get("index") @@ -120,10 +161,11 @@ if status == "failed" and not validation: # Soft prompt — don't block, but encourage explanation tasks[idx - 1].status = status + await _save_tasks(sid, tasks) return ToolResult( success=True, output=( - self._render(sid) + "\n\n" + self._render(sid, tasks) + "\n\n" f"[Tip: next time add a 'validation' field when marking a step failed — " "describe what went wrong and what you tried. " "This helps with re-planning.]" @@ -132,16 +174,17 @@ tasks[idx - 1].status = status tasks[idx - 1].validation = validation - return ToolResult(success=True, output=self._render(sid)) + await _save_tasks(sid, tasks) + return ToolResult(success=True, output=self._render(sid, tasks)) if op == "clear": - _plans.pop(sid, None) + if _kv_store is not None: + await _kv_store.clear_scope(_uid(), sid, "todo") return ToolResult(success=True, output="Plan cleared.") return ToolResult(success=False, output="", error=f"Unknown op: {op!r}") - def _render(self, sid: str) -> str: - tasks = _plans.get(sid, []) + def _render(self, sid: str, tasks: list[_Task]) -> str: if not tasks: return "Plan is empty." n = len(tasks) @@ -155,32 +198,34 @@ return "\n".join(lines) -# ── Public API for agent.py (avoids tight coupling to internal _plans dict) ── +# ── Public API for agent.py (avoids tight coupling to internal storage) ── -def get_task_snapshot(session_id: str) -> frozenset[tuple[str, str]]: +async def get_task_snapshot(session_id: str) -> frozenset[tuple[str, str]]: """Return a frozenset of (task_text, status) for the session's todo list.""" try: - return frozenset((t.text, t.status) for t in _plans.get(session_id, [])) + tasks = await _load_tasks(session_id) + return frozenset((t.text, t.status) for t in tasks) except Exception: return frozenset() -def get_failed_steps(session_id: str) -> frozenset[tuple[int, str]]: +async def get_failed_steps(session_id: str) -> frozenset[tuple[int, str]]: """Return a frozenset of (1-based index, task_text) for failed steps.""" try: + tasks = await _load_tasks(session_id) return frozenset( (i + 1, t.text) - for i, t in enumerate(_plans.get(session_id, [])) + for i, t in enumerate(tasks) if t.status == "failed" ) except Exception: return frozenset() -def get_progress_message(session_id: str, *, first_iteration: bool = False) -> "Message | None": +async def get_progress_message(session_id: str, *, first_iteration: bool = False) -> "Message | None": """Build a compact system reminder with current todo state.""" try: - tasks = _plans.get(session_id, []) + tasks = await _load_tasks(session_id) if not tasks: return None n = len(tasks) @@ -240,14 +285,15 @@ return None -def set_tasks(session_id: str, task_texts: list[str]) -> None: +async def set_tasks(session_id: str, task_texts: list[str]) -> None: """Auto-populate the todo list from plan steps.""" - _plans[session_id] = [_Task(text=s) for s in task_texts] + tasks = [_Task(text=s) for s in task_texts] + await _save_tasks(session_id, tasks) -def render_todo_lines(session_id: str) -> list[str]: +async def render_todo_lines(session_id: str) -> list[str]: """Return a list of formatted todo lines for goal anchoring.""" - tasks = _plans.get(session_id, []) + tasks = await _load_tasks(session_id) if not tasks: return [] lines = [] diff --git a/navi/tools/tool_manual.py b/navi/tools/tool_manual.py index 12a88ec..7d0c3e2 100644 --- a/navi/tools/tool_manual.py +++ b/navi/tools/tool_manual.py @@ -18,7 +18,7 @@ "properties": { "tool_name": { "type": "string", - "description": "Name of the tool to look up, e.g. 'create_mcp_server'", + "description": "Name of the tool to look up, e.g. 'filesystem'", } }, "required": ["tool_name"], diff --git a/tests/unit/core/test_context_builder.py b/tests/unit/core/test_context_builder.py index aae60c6..f07f80a 100644 --- a/tests/unit/core/test_context_builder.py +++ b/tests/unit/core/test_context_builder.py @@ -59,8 +59,9 @@ class TestBuildGoalAnchor: def test_includes_original_request(self): + import asyncio builder = ContextBuilder(profile_registry=make_profile_registry()) - msg = builder._build_goal_anchor("sess-1", "Write tests") + msg = asyncio.run(builder._build_goal_anchor("sess-1", "Write tests")) assert msg.role == "system" assert "Original request: Write tests" in msg.content assert "Stay on track" in msg.content diff --git a/tests/unit/tools/test_code_exec.py b/tests/unit/tools/test_code_exec.py index 45e7dcb..3f240bc 100644 --- a/tests/unit/tools/test_code_exec.py +++ b/tests/unit/tools/test_code_exec.py @@ -25,11 +25,6 @@ # stderr is captured but the tool may or may not consider it an error assert "err" in (result.output or result.error or "") - async def test_unsupported_language(self, tool): - result = await tool.execute({"code": "echo hi", "language": "bash"}) - assert not result.success - assert "not supported" in result.output - async def test_syntax_error(self, tool): result = await tool.execute({"code": "print("}) assert not result.success