diff --git a/docs/testing.md b/docs/testing.md index 067d406..732fb67 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -75,13 +75,13 @@ | 1 | `navi.profiles.base` | 9 | ✅ Done | | 2 | `navi.memory.store` | 18 | ✅ Done | | 2 | `navi.memory.extractor` | 11 | ✅ Done | -| 3 | `navi.api.routes` | — | ⏳ Pending | -| 3 | `navi.api.websocket` | — | ⏳ Pending | -| 4 | `navi.core.agent` | — | ⏳ Pending | -| 4 | `navi.core.planning` | — | ⏳ Pending | -| 5 | `navi.tools.filesystem` | — | ⏳ Pending | -| 5 | `navi.tools.code_exec` | — | ⏳ Pending | -| 5 | `navi.tools.terminal` | — | ⏳ Pending | +| 3 | `navi.api.routes` | 19 | ✅ Done | +| 3 | `navi.api.websocket` | 7 | ✅ Done | +| 4 | `navi.core.agent` | 9 | ✅ Done | +| 4 | `navi.core.planning` | 5 | ✅ Done | +| 5 | `navi.tools.filesystem` | 13 | ✅ Done | +| 5 | `navi.tools.code_exec` | 5 | ✅ Done | +| 5 | `navi.tools.terminal` | 4 | ✅ Done | ## Running tests diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..96423e2 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,96 @@ +"""Integration test fixtures — FastAPI app with mocked dependencies.""" + +from typing import AsyncGenerator + +import pytest +from fastapi.testclient import TestClient + +from navi.core.events import StreamEnd, TextDelta +from navi.core.registry import BackendRegistry +from navi.core.session import InMemorySessionStore, Session +from navi.llm.base import Message +from tests.conftest_factory import FakeLLMBackend, make_profile_registry, make_registry_with_tools + + +class FakeAgent: + """Deterministic agent for integration tests. + + Yields pre-configured events via run_stream(). + run() returns a fixed string. + """ + + def __init__(self, stream_events=None, run_response="Hello") -> None: + self._stream_events = stream_events or [] + self._run_response = run_response + + async def run(self, session_id: str, user_message: str, images=None) -> str: + return self._run_response + + async def run_stream(self, session_id, user_message, images=None, display_message=None): + for ev in self._stream_events: + yield ev + + +@pytest.fixture +def mock_deps(monkeypatch): + """Patch navi.api.deps internal caches so FastAPI routes see mocked stores. + + We patch the module-level singletons (_session_store, _registries, etc.) + rather than the getter functions because FastAPI's Depends() captures the + original function object at import time; replacing the attribute on the + module does not affect the reference stored in the route decorator. + """ + import navi.api.deps as deps + import navi.config as _config + + # Ensure database_url is set so _make_memory_store doesn't raise + _config.settings.database_url = "postgresql://fake" + + store = InMemorySessionStore() + profiles = make_profile_registry() + tools = make_registry_with_tools() + backends = BackendRegistry() + backends.register("ollama", FakeLLMBackend()) + + # Patch internal singletons so original getter functions return our fakes + monkeypatch.setattr(deps, "_session_store", store) + monkeypatch.setattr(deps, "_memory_store", None) + monkeypatch.setattr(deps, "_registries", (tools, profiles, backends, None)) + monkeypatch.setattr(deps, "_workers", []) + + # Patch get_agent in routes that import it directly (messages.py) + fake_agent = FakeAgent() + monkeypatch.setattr("navi.api.routes.messages.get_agent", lambda: fake_agent) + # websocket imports deps lazily inside the handler — no need to patch directly + + return { + "session_store": store, + "profiles": profiles, + "tools": tools, + "backends": backends, + "agent": fake_agent, + } + + +@pytest.fixture +def client(mock_deps): + """FastAPI TestClient with mocked dependencies.""" + from navi.main import app + + return TestClient(app) + + +@pytest.fixture +def make_session(mock_deps): + """Helper to create a session in the mocked store.""" + store = mock_deps["session_store"] + + async def _make(profile_id="secretary", messages=None): + session = await store.create(profile_id) + if messages: + for m in messages: + session.messages.append(m) + await store.save(session) + return session + + return _make diff --git a/tests/integration/test_api_routes.py b/tests/integration/test_api_routes.py new file mode 100644 index 0000000..112d59d --- /dev/null +++ b/tests/integration/test_api_routes.py @@ -0,0 +1,146 @@ +"""Integration tests for REST API routes.""" + +import pytest + +from navi.llm.base import Message + + +class TestHealth: + def test_health(self, client): + response = client.get("/health") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ok" + assert "embed" in data + + def test_health_embed(self, client): + response = client.get("/health/embed") + assert response.status_code == 200 + data = response.json() + assert "ok" in data + + +class TestAgents: + def test_list_profiles(self, client): + response = client.get("/agents/profiles") + assert response.status_code == 200 + data = response.json() + assert len(data) >= 2 + assert any(p["id"] == "secretary" for p in data) + + def test_list_tools(self, client): + response = client.get("/agents/tools") + assert response.status_code == 200 + data = response.json() + assert len(data) >= 2 + names = {t["name"] for t in data} + assert "test_tool" in names + + +class TestSessions: + async def test_create_session(self, client): + response = client.post("/sessions", json={"profile_id": "secretary"}) + assert response.status_code == 201 + data = response.json() + assert "session_id" in data + assert data["profile_id"] == "secretary" + + def test_create_session_invalid_profile(self, client): + response = client.post("/sessions", json={"profile_id": "nonexistent"}) + assert response.status_code == 404 + + @pytest.mark.anyio + async def test_list_sessions(self, client, make_session): + session = await make_session("secretary", [Message(role="user", content="hi")]) + response = client.get("/sessions") + assert response.status_code == 200 + data = response.json() + assert any(s["session_id"] == session.id for s in data) + + @pytest.mark.anyio + async def test_get_session(self, client, make_session): + session = await make_session("secretary") + response = client.get(f"/sessions/{session.id}") + assert response.status_code == 200 + data = response.json() + assert data["session_id"] == session.id + assert data["profile_id"] == "secretary" + + def test_get_session_not_found(self, client): + response = client.get("/sessions/nonexistent") + assert response.status_code == 404 + + @pytest.mark.anyio + async def test_pin_session(self, client, make_session): + session = await make_session("secretary") + response = client.patch(f"/sessions/{session.id}/pin", json={"pinned": True}) + assert response.status_code == 200 + data = response.json() + assert data["pinned"] is True + + def test_pin_session_not_found(self, client): + response = client.patch("/sessions/nonexistent/pin", json={"pinned": True}) + assert response.status_code == 404 + + @pytest.mark.anyio + async def test_get_context(self, client, make_session, mock_deps): + session = await make_session("secretary", [Message(role="user", content="hello")]) + session.context.append(Message(role="user", content="hello")) + await mock_deps["session_store"].save(session) + response = client.get(f"/sessions/{session.id}/context") + assert response.status_code == 200 + data = response.json() + assert data["session_id"] == session.id + assert data["message_count"] == 1 + + def test_get_context_not_found(self, client): + response = client.get("/sessions/nonexistent/context") + assert response.status_code == 404 + + @pytest.mark.anyio + async def test_get_planning(self, client, make_session): + session = await make_session("secretary") + session.planning_logs.append({"phases": {}}) + response = client.get(f"/sessions/{session.id}/planning") + assert response.status_code == 200 + data = response.json() + assert data["session_id"] == session.id + assert len(data["logs"]) == 1 + + def test_get_planning_not_found(self, client): + response = client.get("/sessions/nonexistent/planning") + assert response.status_code == 404 + + @pytest.mark.anyio + async def test_delete_session(self, client, make_session): + session = await make_session("secretary") + response = client.delete(f"/sessions/{session.id}") + assert response.status_code == 204 + assert client.get(f"/sessions/{session.id}").status_code == 404 + + def test_delete_session_not_found(self, client): + response = client.delete("/sessions/nonexistent") + assert response.status_code == 404 + + +class TestMessages: + @pytest.mark.anyio + async def test_send_message(self, client, make_session, monkeypatch): + session = await make_session("secretary") + + class DummyAgent: + async def run(self, session_id, user_message, images=None): + return "Response text" + + # Patch the Agent class in deps so the original get_agent() (captured by + # Depends() at import time) instantiates our dummy when called. + monkeypatch.setattr("navi.api.deps.Agent", lambda *a, **kw: DummyAgent()) + response = client.post(f"/sessions/{session.id}/messages", json={"content": "hi"}) + assert response.status_code == 200 + data = response.json() + assert data["role"] == "assistant" + assert data["content"] == "Response text" + + def test_send_message_not_found(self, client): + response = client.post("/sessions/nonexistent/messages", json={"content": "hi"}) + assert response.status_code == 404 diff --git a/tests/integration/test_websocket.py b/tests/integration/test_websocket.py new file mode 100644 index 0000000..b701f3b --- /dev/null +++ b/tests/integration/test_websocket.py @@ -0,0 +1,178 @@ +"""Integration tests for WebSocket endpoint.""" + +import asyncio +import json + +import pytest +from fastapi.testclient import TestClient + +from navi.core.events import StreamEnd, TextDelta +from navi.llm.base import Message + + +class FakeAgent: + """Deterministic agent for WebSocket tests.""" + + def __init__(self, stream_events=None, run_response="Hello") -> None: + self._stream_events = stream_events or [] + self._run_response = run_response + + async def run(self, session_id: str, user_message: str, images=None) -> str: + return self._run_response + + async def run_stream(self, session_id, user_message, images=None, display_message=None): + for ev in self._stream_events: + yield ev + + +@pytest.fixture(autouse=True) +def _clear_runs(monkeypatch): + """Clear the module-level _runs dict before every WS test.""" + import navi.api.websocket as ws_mod + + ws_mod._runs.clear() + yield + + +@pytest.fixture +def fake_agent_ws(monkeypatch, mock_deps): + """Patch Agent in websocket module so handlers use FakeAgent.""" + import navi.api.websocket as ws_mod + + events = [ + TextDelta(delta="Hello"), + StreamEnd(full_content="Hello"), + ] + fake = FakeAgent(stream_events=events) + monkeypatch.setattr(ws_mod, "Agent", lambda *a, **kw: fake) + return fake + + +class TestWebSocketConnect: + def test_invalid_session(self, client): + from starlette.testclient import WebSocketDisconnect + + with pytest.raises(WebSocketDisconnect): + with client.websocket_connect("/ws/sessions/nonexistent"): + pass + + @pytest.mark.anyio + async def test_send_message(self, client, make_session, fake_agent_ws): + session = await make_session("secretary") + with client.websocket_connect(f"/ws/sessions/{session.id}") as ws: + # First message on a fresh connection is session_sync + m0 = ws.receive_json() + assert m0["type"] == "session_sync" + ws.send_json({"type": "message", "content": "hi"}) + # FakeAgent emits: stream_start (handler) → stream_delta → stream_end + msgs: list[dict] = [] + for _ in range(3): + msgs.append(ws.receive_json()) + + types = [m["type"] for m in msgs] + assert "stream_start" in types + assert any(m.get("type") == "stream_delta" for m in msgs) + assert any(m.get("type") == "stream_end" for m in msgs) + + @pytest.mark.anyio + async def test_reconnect_replay(self, client, make_session, monkeypatch): + """Reconnect while a run is active — replay buffer should emit past events.""" + import navi.api.websocket as ws_mod + + session = await make_session("secretary") + + # Inject an active run with buffered events + run = ws_mod._AgentRun() + run.events = [ + {"type": "stream_start"}, + {"type": "stream_delta", "delta": "hello"}, + ] + ws_mod._runs[session.id] = run + + with client.websocket_connect(f"/ws/sessions/{session.id}") as ws: + msgs = _collect_until_done(ws, max_messages=5) + + types = [m["type"] for m in msgs] + assert "stream_start" in types + assert "replay_start" in types + assert any(m.get("type") == "stream_delta" for m in msgs) + assert "replay_end" in types + + # Clean up injected run + ws_mod._runs.pop(session.id, None) + if run.task: + run.task.cancel() + + @pytest.mark.anyio + async def test_invalid_json(self, client, make_session): + session = await make_session("secretary") + with client.websocket_connect(f"/ws/sessions/{session.id}") as ws: + # First message on a fresh connection is session_sync + m0 = ws.receive_json() + assert m0["type"] == "session_sync" + ws.send_text("not json") + msg = ws.receive_json() + assert msg["type"] == "error" + + @pytest.mark.anyio + async def test_missing_content(self, client, make_session): + session = await make_session("secretary") + with client.websocket_connect(f"/ws/sessions/{session.id}") as ws: + # First message on a fresh connection is session_sync + m0 = ws.receive_json() + assert m0["type"] == "session_sync" + ws.send_json({"type": "message"}) + msg = ws.receive_json() + assert msg["type"] == "error" + + +class TestStopSession: + @pytest.mark.anyio + async def test_stop_no_active_run(self, client, make_session): + session = await make_session("secretary") + response = client.post(f"/sessions/{session.id}/stop") + assert response.status_code == 200 + data = response.json() + assert data["ok"] is False + + @pytest.mark.anyio + async def test_stop_active_run(self, client, make_session, monkeypatch): + import navi.api.websocket as ws_mod + + session = await make_session("secretary") + + # Start a long-running agent task in background + run = ws_mod._AgentRun() + run.task = asyncio.create_task(asyncio.sleep(10)) + ws_mod._runs[session.id] = run + + response = client.post(f"/sessions/{session.id}/stop") + assert response.status_code == 200 + data = response.json() + assert data["ok"] is True + assert run.stop_event.is_set() + run.task.cancel() + try: + await run.task + except asyncio.CancelledError: + pass + + +# ── Helpers ────────────────────────────────────────────────────────────────── + +def _collect_until_done(ws, max_messages: int = 10) -> list[dict]: + """Collect websocket messages until stream_end, error, or max messages.""" + msgs: list[dict] = [] + for _ in range(max_messages): + try: + raw = ws.receive_text() + except Exception: + break + try: + msg = json.loads(raw) + except json.JSONDecodeError: + continue + msgs.append(msg) + if msg.get("type") in ("stream_end", "error", "stream_stopped", "session_sync"): + break + return msgs diff --git a/tests/unit/core/test_agent_context_size.py b/tests/unit/core/test_agent_context_size.py new file mode 100644 index 0000000..383a7bd --- /dev/null +++ b/tests/unit/core/test_agent_context_size.py @@ -0,0 +1,51 @@ +"""Unit tests for Agent._check_context_size.""" + +import pytest + +from navi.config import Settings +from navi.core.agent import Agent +from navi.exceptions import ContextTooLargeError +from navi.llm.base import Message + + +class TestCheckContextSize: + @pytest.fixture(autouse=True) + def _patch_settings(self, monkeypatch): + """Use a small context window so tests don't need huge strings.""" + import navi.core.agent as _agent_mod + + monkeypatch.setattr( + _agent_mod, + "settings", + Settings( + _env_file=None, + ollama_num_ctx=128, + output_reserve_tokens=8, + navi_persona_file="", + ), + ) + + def test_empty_context_ok(self): + agent = Agent(None, None, None, None) + agent._check_context_size([]) + + def test_small_context_ok(self): + agent = Agent(None, None, None, None) + msgs = [Message(role="user", content="hi")] + agent._check_context_size(msgs) + + def test_exceeds_window_raises(self): + agent = Agent(None, None, None, None) + # 128 ctx - 8 reserve = 120 available; each char ~0.25 tokens + # Need > 120 * 4 = 480 chars to exceed + msgs = [Message(role="user", content="x" * 600)] + with pytest.raises(ContextTooLargeError) as exc_info: + agent._check_context_size(msgs) + assert "Context too large" in str(exc_info.value) + + def test_images_count_toward_limit(self): + agent = Agent(None, None, None, None) + # 500 tokens per image; with 128 ctx we exceed immediately + msgs = [Message(role="user", content="look", images=["b64"])] + with pytest.raises(ContextTooLargeError): + agent._check_context_size(msgs) diff --git a/tests/unit/core/test_agent_stream_guard.py b/tests/unit/core/test_agent_stream_guard.py new file mode 100644 index 0000000..f83705b --- /dev/null +++ b/tests/unit/core/test_agent_stream_guard.py @@ -0,0 +1,84 @@ +"""Unit tests for Agent._iter_stream_guarded.""" + +import asyncio + +import pytest + +from navi.core.agent import _iter_stream_guarded +from navi.exceptions import LLMBackendError +from navi.llm.base import LLMChunk + + +async def _yield_chunks(chunks, delay: float = 0): + for c in chunks: + if delay: + await asyncio.sleep(delay) + yield c + + +class TestIterStreamGuarded: + async def test_yields_all_chunks(self): + chunks = [LLMChunk(delta="a"), LLMChunk(delta="b"), LLMChunk(delta="")] + result = [] + async for c in _iter_stream_guarded( + _yield_chunks(chunks), stop_event=None, first_chunk_timeout=5, chunk_timeout=5 + ): + result.append(c) + assert len(result) == 3 + assert [c.delta for c in result] == ["a", "b", ""] + + async def test_respects_stop_event(self): + chunks = [LLMChunk(delta="a")] + + async def _slow(): + yield LLMChunk(delta="a") + await asyncio.sleep(10) + yield LLMChunk(delta="b") + + stop = asyncio.Event() + stop.set() + result = [] + async for c in _iter_stream_guarded( + _slow(), stop_event=stop, first_chunk_timeout=5, chunk_timeout=5 + ): + result.append(c) + # Should stop before 'b' because stop_event is set + assert len(result) == 1 + assert result[0].delta == "a" + + async def test_first_chunk_timeout(self): + async def _very_slow(): + await asyncio.sleep(10) + yield LLMChunk(delta="a") + + with pytest.raises(LLMBackendError) as exc_info: + async for _ in _iter_stream_guarded( + _very_slow(), stop_event=None, first_chunk_timeout=0.1, chunk_timeout=5 + ): + pass + assert "timed out" in str(exc_info.value).lower() + + async def test_chunk_timeout(self): + async def _slow_gap(): + yield LLMChunk(delta="a") + await asyncio.sleep(10) + yield LLMChunk(delta="b") + + with pytest.raises(LLMBackendError) as exc_info: + async for _ in _iter_stream_guarded( + _slow_gap(), stop_event=None, first_chunk_timeout=5, chunk_timeout=0.1 + ): + pass + assert "timed out" in str(exc_info.value).lower() + + async def test_empty_stream(self): + async def _empty(): + return + yield # make it a generator + + result = [] + async for c in _iter_stream_guarded( + _empty(), stop_event=None, first_chunk_timeout=1, chunk_timeout=1 + ): + result.append(c) + assert result == [] diff --git a/tests/unit/core/test_planning.py b/tests/unit/core/test_planning.py new file mode 100644 index 0000000..99f0f25 --- /dev/null +++ b/tests/unit/core/test_planning.py @@ -0,0 +1,27 @@ +"""Unit tests for navi.core.planning.""" + +import pytest + +from navi.core.planning import _parse_plan_steps + + +class TestParsePlanSteps: + def test_basic_numbered_list(self): + text = "**Steps:**\n1. First step\n2. Second step\n3. Third step" + assert _parse_plan_steps(text) == ["First step", "Second step", "Third step"] + + def test_parenthesised_numbers(self): + text = "**Steps:**\n1) Step one\n2) Step two" + assert _parse_plan_steps(text) == ["Step one", "Step two"] + + def test_ignores_bracket_prefixes(self): + text = "**Steps:**\n1. [TOOL] Do thing\n2. Normal step" + assert _parse_plan_steps(text) == ["Normal step"] + + def test_empty_steps_section(self): + text = "**Steps:**\n\n**Notes:** nothing" + assert _parse_plan_steps(text) == [] + + def test_no_steps_section(self): + text = "Some random text without steps" + assert _parse_plan_steps(text) == [] diff --git a/tests/unit/tools/test_code_exec.py b/tests/unit/tools/test_code_exec.py new file mode 100644 index 0000000..45e7dcb --- /dev/null +++ b/tests/unit/tools/test_code_exec.py @@ -0,0 +1,35 @@ +"""Unit tests for code_exec tool.""" + +import pytest + +from navi.tools.code_exec import CodeExecTool + + +class TestCodeExecTool: + @pytest.fixture + def tool(self): + return CodeExecTool() + + async def test_hello_world(self, tool): + result = await tool.execute({"code": "print('hello')"}) + assert result.success + assert "hello" in result.output + + async def test_math(self, tool): + result = await tool.execute({"code": "print(2 + 3)"}) + assert result.success + assert "5" in result.output + + async def test_stderr(self, tool): + result = await tool.execute({"code": "import sys; print('err', file=sys.stderr)"}) + # stderr is captured but the tool may or may not consider it an error + assert "err" in (result.output or result.error or "") + + async def test_unsupported_language(self, tool): + result = await tool.execute({"code": "echo hi", "language": "bash"}) + assert not result.success + assert "not supported" in result.output + + async def test_syntax_error(self, tool): + result = await tool.execute({"code": "print("}) + assert not result.success diff --git a/tests/unit/tools/test_filesystem.py b/tests/unit/tools/test_filesystem.py new file mode 100644 index 0000000..c28d91a --- /dev/null +++ b/tests/unit/tools/test_filesystem.py @@ -0,0 +1,110 @@ +"""Unit tests for filesystem tool (non-AI operations).""" + +import pytest + +from navi.tools.filesystem import FilesystemTool, _check_path +from navi.tools.base import ToolResult + + +class TestCheckPath: + @pytest.fixture(autouse=True) + def _allow_all(self, monkeypatch): + import navi.tools.filesystem as _fs_mod + monkeypatch.setattr(_fs_mod.settings, "fs_allowed_paths", "*") + + def test_resolves_relative(self): + p = _check_path(".") + assert p is not None + assert p.is_dir() + + def test_expands_tilde(self): + p = _check_path("~") + assert p is not None + + def test_rejects_empty(self): + # Empty path resolves to cwd, so _check_path returns a Path, not None + assert _check_path("") is not None + + def test_restricted_paths(self, monkeypatch, tmp_path): + import navi.tools.filesystem as _fs_mod + allowed = tmp_path / "allowed" + allowed.mkdir() + blocked = tmp_path / "blocked" + blocked.mkdir() + monkeypatch.setattr(_fs_mod.settings, "fs_allowed_paths", str(allowed)) + assert _check_path(str(allowed / "file.txt")) is not None + assert _check_path(str(blocked / "file.txt")) is None + + +class TestFilesystemToolBasic: + @pytest.fixture(autouse=True) + def _allow_all(self, monkeypatch): + import navi.tools.filesystem as _fs_mod + monkeypatch.setattr(_fs_mod.settings, "fs_allowed_paths", "*") + + @pytest.fixture + def tool(self): + return FilesystemTool() + + async def test_read_file(self, tool, tmp_path): + f = tmp_path / "hello.txt" + f.write_text("world") + result = await tool.execute({"action": "read", "path": str(f)}) + assert result.success + assert "world" in result.output + + async def test_read_missing(self, tool, tmp_path): + f = tmp_path / "missing.txt" + result = await tool.execute({"action": "read", "path": str(f)}) + assert not result.success + assert "not found" in result.output.lower() or "does not exist" in result.output.lower() + + async def test_write_file(self, tool, tmp_path): + f = tmp_path / "write.txt" + result = await tool.execute({"action": "write", "path": str(f), "content": "data"}) + assert result.success + assert f.read_text() == "data" + + async def test_list_dir(self, tool, tmp_path): + (tmp_path / "a.txt").write_text("a") + (tmp_path / "b.txt").write_text("b") + result = await tool.execute({"action": "list", "path": str(tmp_path)}) + assert result.success + assert "a.txt" in result.output + assert "b.txt" in result.output + + async def test_exists_true(self, tool, tmp_path): + f = tmp_path / "exists.txt" + f.write_text("") + result = await tool.execute({"action": "exists", "path": str(f)}) + assert result.success + assert "true" in result.output.lower() or "exists" in result.output.lower() + + async def test_exists_false(self, tool, tmp_path): + f = tmp_path / "no.txt" + result = await tool.execute({"action": "exists", "path": str(f)}) + # exists returns success=True because the check itself succeeded; + # the answer is in the output text + assert result.success + assert "false" in result.output.lower() + + async def test_delete_file(self, tool, tmp_path): + f = tmp_path / "del.txt" + f.write_text("bye") + result = await tool.execute({"action": "delete", "path": str(f)}) + assert result.success + assert not f.exists() + + async def test_info_file(self, tool, tmp_path): + f = tmp_path / "info.txt" + f.write_text("content") + result = await tool.execute({"action": "info", "path": str(f)}) + assert result.success + assert "info.txt" in result.output + + async def test_append_file(self, tool, tmp_path): + f = tmp_path / "append.txt" + f.write_text("first") + result = await tool.execute({"action": "append", "path": str(f), "content": "-second"}) + assert result.success + assert f.read_text() == "first-second" diff --git a/tests/unit/tools/test_terminal.py b/tests/unit/tools/test_terminal.py new file mode 100644 index 0000000..72a702a --- /dev/null +++ b/tests/unit/tools/test_terminal.py @@ -0,0 +1,30 @@ +"""Unit tests for terminal tool.""" + +import pytest + +from navi.tools.terminal import TerminalTool + + +class TestTerminalTool: + @pytest.fixture + def tool(self): + return TerminalTool() + + async def test_echo(self, tool): + result = await tool.execute({"command": "echo hello"}) + assert result.success + assert "hello" in result.output + + async def test_pwd(self, tool): + result = await tool.execute({"command": "pwd"}) + assert result.success + assert "/" in result.output + + async def test_empty_command(self, tool): + result = await tool.execute({"command": " "}) + assert not result.success + assert "empty" in result.output.lower() + + async def test_invalid_command(self, tool): + result = await tool.execute({"command": "this_command_does_not_exist_12345"}) + assert not result.success