diff --git a/navi/core/container.py b/navi/core/container.py index 2fa8463..81390fa 100644 --- a/navi/core/container.py +++ b/navi/core/container.py @@ -105,7 +105,7 @@ mcp_manager = McpManager() await mcp_manager.load_all() - from navi.tools.terminal_manager import TerminalManager + from navi.tools._internal.terminal_manager import TerminalManager terminal_manager = TerminalManager() terminal_manager.start() diff --git a/navi/tools/_internal/terminal_manager.py b/navi/tools/_internal/terminal_manager.py new file mode 100644 index 0000000..5a89c7c --- /dev/null +++ b/navi/tools/_internal/terminal_manager.py @@ -0,0 +1,360 @@ +"""Persistent terminal session manager. + +Owns named subprocess terminals keyed by (session_id, terminal_name). +Supports background long-running processes, streaming output via the +agent event sink, and automatic idle cleanup. +""" + +from __future__ import annotations + +import asyncio +import dataclasses +from collections import deque +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import structlog + +if TYPE_CHECKING: + from navi.core.events import AgentEvent + +log = structlog.get_logger() + +_MAX_OUTPUT_BUFFER = 500 # lines kept per terminal for status queries +_DEFAULT_CLEANUP_INTERVAL = 60 # seconds between idle checks +_DEFAULT_MAX_IDLE = 1800 # 30 minutes +_MAX_TERMINALS_PER_SESSION = 10 + + +@dataclass +class TerminalSession: + """A single named persistent terminal subprocess.""" + + name: str + description: str + session_id: str + command: str + cwd: Path | None = None + env: dict[str, str] | None = None + background: bool = False + proc: asyncio.subprocess.Process | None = None + stdin: asyncio.StreamWriter | None = None + stdout_task: asyncio.Task | None = None + stderr_task: asyncio.Task | None = None + output_buffer: deque[str] = field(default_factory=lambda: deque(maxlen=_MAX_OUTPUT_BUFFER)) + created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + last_active: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + closed: bool = False + close_reason: str | None = None + + def touch(self) -> None: + self.last_active = datetime.now(timezone.utc) + + def is_alive(self) -> bool: + if self.closed: + return False + if self.proc is None: + return False + return self.proc.returncode is None + + def summary(self) -> dict[str, Any]: + return { + "name": self.name, + "description": self.description, + "command": self.command, + "status": "busy" if self.is_alive() else "idle", + "pid": self.proc.pid if self.proc else None, + "cwd": str(self.cwd) if self.cwd else None, + "background": self.background, + "created_at": self.created_at.isoformat(), + "last_active": self.last_active.isoformat(), + "uptime_seconds": round((datetime.now(timezone.utc) - self.created_at).total_seconds(), 1), + } + + def status(self) -> dict[str, Any]: + out = self.summary() + out["output_tail"] = list(self.output_buffer)[-20:] + out["returncode"] = self.proc.returncode if self.proc else None + out["closed"] = self.closed + out["close_reason"] = self.close_reason + return out + + +class TerminalManager: + """Owns all persistent terminal sessions across all Navi sessions.""" + + def __init__(self, max_idle_seconds: int = _DEFAULT_MAX_IDLE) -> None: + self._sessions: dict[tuple[str, str], TerminalSession] = {} + self._max_idle = max_idle_seconds + self._cleanup_task: asyncio.Task | None = None + + # ── Lifecycle ──────────────────────────────────────────────────────────── + + def start(self) -> None: + if self._cleanup_task is None or self._cleanup_task.done(): + self._cleanup_task = asyncio.create_task(self._cleanup_loop()) + + async def shutdown(self) -> None: + if self._cleanup_task and not self._cleanup_task.done(): + self._cleanup_task.cancel() + try: + await self._cleanup_task + except asyncio.CancelledError: + pass + for key in list(self._sessions): + await self._close_one(key, reason="shutdown") + + async def _cleanup_loop(self) -> None: + while True: + try: + await asyncio.sleep(_DEFAULT_CLEANUP_INTERVAL) + self.cleanup_idle() + except asyncio.CancelledError: + break + except Exception: + log.exception("terminal_manager.cleanup_loop_error") + + # ── Public API ─────────────────────────────────────────────────────────── + + def list(self, session_id: str) -> list[dict[str, Any]]: + return [ + session.summary() + for (sid, name), session in self._sessions.items() + if sid == session_id and not session.closed + ] + + def status(self, session_id: str, name: str) -> dict[str, Any] | None: + session = self._sessions.get((session_id, name)) + if session is None or session.closed: + return None + session.touch() + return session.status() + + async def open( + self, + session_id: str, + name: str, + description: str, + command: str, + background: bool = False, + cwd: Path | None = None, + env: dict[str, str] | None = None, + timeout: int = 20, + event_sink: asyncio.Queue | None = None, + exec_tokens: list[str] | None = None, + ) -> TerminalSession: + """Open a new persistent terminal session. + + If *background* is True the process is started and the coroutine + returns immediately — output is streamed via *event_sink*. + If False, it waits for the process to finish and returns. + + Pass *exec_tokens* to run via ``create_subprocess_exec`` instead of + ``create_subprocess_shell`` (enforces allowlist restrictions). + """ + key = (session_id, name) + if key in self._sessions and not self._sessions[key].closed: + raise ValueError(f"Terminal '{name}' already exists for session {session_id}") + + session_count = sum( + 1 for (sid, _), s in self._sessions.items() + if sid == session_id and not s.closed + ) + if session_count >= _MAX_TERMINALS_PER_SESSION: + raise ValueError( + f"Max {_MAX_TERMINALS_PER_SESSION} terminals per session reached." + ) + + session = TerminalSession( + name=name, + description=description, + session_id=session_id, + command=command, + cwd=cwd, + env=env, + background=background, + ) + self._sessions[key] = session + + if exec_tokens: + proc = await asyncio.create_subprocess_exec( + *exec_tokens, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE if background else None, + cwd=cwd, + env=env, + ) + else: + proc = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE if background else None, + cwd=cwd, + env=env, + ) + session.proc = proc + session.stdin = proc.stdin + + # Start background readers + session.stdout_task = asyncio.create_task( + self._read_stream(proc.stdout, session, "stdout", event_sink), + name=f"term-{session_id}-{name}-stdout", + ) + session.stderr_task = asyncio.create_task( + self._read_stream(proc.stderr, session, "stderr", event_sink), + name=f"term-{session_id}-{name}-stderr", + ) + + if not background: + # Wait for completion, gather output + try: + await asyncio.wait_for(proc.wait(), timeout=timeout) + except asyncio.TimeoutError: + proc.terminate() + try: + await asyncio.wait_for(proc.wait(), timeout=2.0) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + session.close_reason = "timeout" + # Ensure readers finish before returning so output_buffer is complete + await self._cancel_readers(session) + + session.touch() + return session + + async def send_input(self, session_id: str, name: str, text: str) -> bool: + session = self._sessions.get((session_id, name)) + if session is None or session.closed or session.stdin is None: + return False + try: + session.stdin.write(text.encode()) + await session.stdin.drain() + session.touch() + return True + except (BrokenPipeError, ConnectionResetError): + log.warning( + "terminal_manager.send_input_pipe_closed", + session_id=session_id, + name=name, + ) + return False + except Exception: + log.exception("terminal_manager.send_input_error", session_id=session_id, name=name) + return False + + async def close(self, session_id: str, name: str) -> bool: + key = (session_id, name) + if key not in self._sessions: + return False + await self._close_one(key, reason="explicit") + return True + + async def close_all(self, session_id: str) -> int: + """Close every terminal belonging to *session_id*. Returns count closed.""" + keys = [(sid, name) for (sid, name) in list(self._sessions) if sid == session_id] + for key in keys: + await self._close_one(key, reason="session_ended") + return len(keys) + + def cleanup_idle(self) -> None: + now = datetime.now(timezone.utc) + stale = [ + key + for key, session in list(self._sessions.items()) + if not session.closed and (now - session.last_active).total_seconds() > self._max_idle + ] + for key in stale: + asyncio.create_task( + self._close_one_safe(key, reason="idle_timeout"), + name=f"term-cleanup-{key[0]}-{key[1]}", + ) + + async def _close_one_safe(self, key: tuple[str, str], reason: str) -> None: + try: + await self._close_one(key, reason=reason) + except Exception: + log.exception("terminal_manager.cleanup_close_error", key=key) + + # ── Internals ──────────────────────────────────────────────────────────── + + async def _read_stream( + self, + stream: asyncio.StreamReader | None, + session: TerminalSession, + stream_name: str, + event_sink: asyncio.Queue | None, + ) -> None: + if stream is None: + return + try: + while True: + line = await stream.readline() + if not line: + break + text = line.decode(errors="replace") + session.output_buffer.append(text) + session.touch() + if event_sink is not None: + try: + from navi.core.events import TerminalOutputDelta + + await event_sink.put( + TerminalOutputDelta( + terminal_name=session.name, + stream=stream_name, + delta=text, + ) + ) + except Exception: + log.exception("terminal_manager.sink_put_error") + except asyncio.CancelledError: + pass + except Exception: + log.exception("terminal_manager.read_stream_error") + + async def _cancel_readers(self, session: TerminalSession) -> None: + for task in (session.stdout_task, session.stderr_task): + if task and not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + async def _close_one(self, key: tuple[str, str], reason: str) -> None: + session = self._sessions.pop(key, None) + if session is None: + return + if session.closed: + return + session.closed = True + session.close_reason = reason + + if session.proc and session.proc.returncode is None: + try: + session.proc.terminate() + await asyncio.wait_for(session.proc.wait(), timeout=2.0) + except asyncio.TimeoutError: + session.proc.kill() + try: + await asyncio.wait_for(session.proc.wait(), timeout=1.0) + except asyncio.TimeoutError: + pass + except Exception: + log.exception("terminal_manager.proc_kill_error", key=key) + + await self._cancel_readers(session) + + if session.stdin: + try: + session.stdin.close() + await session.stdin.wait_closed() + except Exception: + pass + + log.info("terminal_manager.closed", session_id=key[0], name=key[1], reason=reason) diff --git a/navi/tools/terminal.py b/navi/tools/terminal.py index f199023..bf661a3 100644 --- a/navi/tools/terminal.py +++ b/navi/tools/terminal.py @@ -32,7 +32,7 @@ from navi.config import settings from ._internal.base import Tool, ToolContext, ToolResult, current_event_sink, current_user_id, current_user_role -from .terminal_manager import TerminalManager +from ..tools._internal.terminal_manager import TerminalManager _DEFAULT_TIMEOUT = 20 _MAX_TIMEOUT = 300 diff --git a/navi/tools/terminal_manager.py b/navi/tools/terminal_manager.py deleted file mode 100644 index 5a89c7c..0000000 --- a/navi/tools/terminal_manager.py +++ /dev/null @@ -1,360 +0,0 @@ -"""Persistent terminal session manager. - -Owns named subprocess terminals keyed by (session_id, terminal_name). -Supports background long-running processes, streaming output via the -agent event sink, and automatic idle cleanup. -""" - -from __future__ import annotations - -import asyncio -import dataclasses -from collections import deque -from dataclasses import dataclass, field -from datetime import datetime, timezone -from pathlib import Path -from typing import TYPE_CHECKING, Any - -import structlog - -if TYPE_CHECKING: - from navi.core.events import AgentEvent - -log = structlog.get_logger() - -_MAX_OUTPUT_BUFFER = 500 # lines kept per terminal for status queries -_DEFAULT_CLEANUP_INTERVAL = 60 # seconds between idle checks -_DEFAULT_MAX_IDLE = 1800 # 30 minutes -_MAX_TERMINALS_PER_SESSION = 10 - - -@dataclass -class TerminalSession: - """A single named persistent terminal subprocess.""" - - name: str - description: str - session_id: str - command: str - cwd: Path | None = None - env: dict[str, str] | None = None - background: bool = False - proc: asyncio.subprocess.Process | None = None - stdin: asyncio.StreamWriter | None = None - stdout_task: asyncio.Task | None = None - stderr_task: asyncio.Task | None = None - output_buffer: deque[str] = field(default_factory=lambda: deque(maxlen=_MAX_OUTPUT_BUFFER)) - created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) - last_active: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) - closed: bool = False - close_reason: str | None = None - - def touch(self) -> None: - self.last_active = datetime.now(timezone.utc) - - def is_alive(self) -> bool: - if self.closed: - return False - if self.proc is None: - return False - return self.proc.returncode is None - - def summary(self) -> dict[str, Any]: - return { - "name": self.name, - "description": self.description, - "command": self.command, - "status": "busy" if self.is_alive() else "idle", - "pid": self.proc.pid if self.proc else None, - "cwd": str(self.cwd) if self.cwd else None, - "background": self.background, - "created_at": self.created_at.isoformat(), - "last_active": self.last_active.isoformat(), - "uptime_seconds": round((datetime.now(timezone.utc) - self.created_at).total_seconds(), 1), - } - - def status(self) -> dict[str, Any]: - out = self.summary() - out["output_tail"] = list(self.output_buffer)[-20:] - out["returncode"] = self.proc.returncode if self.proc else None - out["closed"] = self.closed - out["close_reason"] = self.close_reason - return out - - -class TerminalManager: - """Owns all persistent terminal sessions across all Navi sessions.""" - - def __init__(self, max_idle_seconds: int = _DEFAULT_MAX_IDLE) -> None: - self._sessions: dict[tuple[str, str], TerminalSession] = {} - self._max_idle = max_idle_seconds - self._cleanup_task: asyncio.Task | None = None - - # ── Lifecycle ──────────────────────────────────────────────────────────── - - def start(self) -> None: - if self._cleanup_task is None or self._cleanup_task.done(): - self._cleanup_task = asyncio.create_task(self._cleanup_loop()) - - async def shutdown(self) -> None: - if self._cleanup_task and not self._cleanup_task.done(): - self._cleanup_task.cancel() - try: - await self._cleanup_task - except asyncio.CancelledError: - pass - for key in list(self._sessions): - await self._close_one(key, reason="shutdown") - - async def _cleanup_loop(self) -> None: - while True: - try: - await asyncio.sleep(_DEFAULT_CLEANUP_INTERVAL) - self.cleanup_idle() - except asyncio.CancelledError: - break - except Exception: - log.exception("terminal_manager.cleanup_loop_error") - - # ── Public API ─────────────────────────────────────────────────────────── - - def list(self, session_id: str) -> list[dict[str, Any]]: - return [ - session.summary() - for (sid, name), session in self._sessions.items() - if sid == session_id and not session.closed - ] - - def status(self, session_id: str, name: str) -> dict[str, Any] | None: - session = self._sessions.get((session_id, name)) - if session is None or session.closed: - return None - session.touch() - return session.status() - - async def open( - self, - session_id: str, - name: str, - description: str, - command: str, - background: bool = False, - cwd: Path | None = None, - env: dict[str, str] | None = None, - timeout: int = 20, - event_sink: asyncio.Queue | None = None, - exec_tokens: list[str] | None = None, - ) -> TerminalSession: - """Open a new persistent terminal session. - - If *background* is True the process is started and the coroutine - returns immediately — output is streamed via *event_sink*. - If False, it waits for the process to finish and returns. - - Pass *exec_tokens* to run via ``create_subprocess_exec`` instead of - ``create_subprocess_shell`` (enforces allowlist restrictions). - """ - key = (session_id, name) - if key in self._sessions and not self._sessions[key].closed: - raise ValueError(f"Terminal '{name}' already exists for session {session_id}") - - session_count = sum( - 1 for (sid, _), s in self._sessions.items() - if sid == session_id and not s.closed - ) - if session_count >= _MAX_TERMINALS_PER_SESSION: - raise ValueError( - f"Max {_MAX_TERMINALS_PER_SESSION} terminals per session reached." - ) - - session = TerminalSession( - name=name, - description=description, - session_id=session_id, - command=command, - cwd=cwd, - env=env, - background=background, - ) - self._sessions[key] = session - - if exec_tokens: - proc = await asyncio.create_subprocess_exec( - *exec_tokens, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.PIPE if background else None, - cwd=cwd, - env=env, - ) - else: - proc = await asyncio.create_subprocess_shell( - command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.PIPE if background else None, - cwd=cwd, - env=env, - ) - session.proc = proc - session.stdin = proc.stdin - - # Start background readers - session.stdout_task = asyncio.create_task( - self._read_stream(proc.stdout, session, "stdout", event_sink), - name=f"term-{session_id}-{name}-stdout", - ) - session.stderr_task = asyncio.create_task( - self._read_stream(proc.stderr, session, "stderr", event_sink), - name=f"term-{session_id}-{name}-stderr", - ) - - if not background: - # Wait for completion, gather output - try: - await asyncio.wait_for(proc.wait(), timeout=timeout) - except asyncio.TimeoutError: - proc.terminate() - try: - await asyncio.wait_for(proc.wait(), timeout=2.0) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - session.close_reason = "timeout" - # Ensure readers finish before returning so output_buffer is complete - await self._cancel_readers(session) - - session.touch() - return session - - async def send_input(self, session_id: str, name: str, text: str) -> bool: - session = self._sessions.get((session_id, name)) - if session is None or session.closed or session.stdin is None: - return False - try: - session.stdin.write(text.encode()) - await session.stdin.drain() - session.touch() - return True - except (BrokenPipeError, ConnectionResetError): - log.warning( - "terminal_manager.send_input_pipe_closed", - session_id=session_id, - name=name, - ) - return False - except Exception: - log.exception("terminal_manager.send_input_error", session_id=session_id, name=name) - return False - - async def close(self, session_id: str, name: str) -> bool: - key = (session_id, name) - if key not in self._sessions: - return False - await self._close_one(key, reason="explicit") - return True - - async def close_all(self, session_id: str) -> int: - """Close every terminal belonging to *session_id*. Returns count closed.""" - keys = [(sid, name) for (sid, name) in list(self._sessions) if sid == session_id] - for key in keys: - await self._close_one(key, reason="session_ended") - return len(keys) - - def cleanup_idle(self) -> None: - now = datetime.now(timezone.utc) - stale = [ - key - for key, session in list(self._sessions.items()) - if not session.closed and (now - session.last_active).total_seconds() > self._max_idle - ] - for key in stale: - asyncio.create_task( - self._close_one_safe(key, reason="idle_timeout"), - name=f"term-cleanup-{key[0]}-{key[1]}", - ) - - async def _close_one_safe(self, key: tuple[str, str], reason: str) -> None: - try: - await self._close_one(key, reason=reason) - except Exception: - log.exception("terminal_manager.cleanup_close_error", key=key) - - # ── Internals ──────────────────────────────────────────────────────────── - - async def _read_stream( - self, - stream: asyncio.StreamReader | None, - session: TerminalSession, - stream_name: str, - event_sink: asyncio.Queue | None, - ) -> None: - if stream is None: - return - try: - while True: - line = await stream.readline() - if not line: - break - text = line.decode(errors="replace") - session.output_buffer.append(text) - session.touch() - if event_sink is not None: - try: - from navi.core.events import TerminalOutputDelta - - await event_sink.put( - TerminalOutputDelta( - terminal_name=session.name, - stream=stream_name, - delta=text, - ) - ) - except Exception: - log.exception("terminal_manager.sink_put_error") - except asyncio.CancelledError: - pass - except Exception: - log.exception("terminal_manager.read_stream_error") - - async def _cancel_readers(self, session: TerminalSession) -> None: - for task in (session.stdout_task, session.stderr_task): - if task and not task.done(): - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - - async def _close_one(self, key: tuple[str, str], reason: str) -> None: - session = self._sessions.pop(key, None) - if session is None: - return - if session.closed: - return - session.closed = True - session.close_reason = reason - - if session.proc and session.proc.returncode is None: - try: - session.proc.terminate() - await asyncio.wait_for(session.proc.wait(), timeout=2.0) - except asyncio.TimeoutError: - session.proc.kill() - try: - await asyncio.wait_for(session.proc.wait(), timeout=1.0) - except asyncio.TimeoutError: - pass - except Exception: - log.exception("terminal_manager.proc_kill_error", key=key) - - await self._cancel_readers(session) - - if session.stdin: - try: - session.stdin.close() - await session.stdin.wait_closed() - except Exception: - pass - - log.info("terminal_manager.closed", session_id=key[0], name=key[1], reason=reason) diff --git a/tests/unit/tools/test_terminal.py b/tests/unit/tools/test_terminal.py index 0aba56c..4e53acd 100644 --- a/tests/unit/tools/test_terminal.py +++ b/tests/unit/tools/test_terminal.py @@ -5,8 +5,7 @@ import pytest from navi.tools.terminal import TerminalTool -from navi.tools.terminal_manager import TerminalManager - +from navi.tools._internal.terminal_manager import TerminalManager, _MAX_TERMINALS_PER_SESSION class TestTerminalTool: @pytest.fixture @@ -185,7 +184,6 @@ from navi.tools._internal.base import ToolContext ctx = ToolContext(session_id="s6", stop_event=None, model="test") - from navi.tools.terminal_manager import _MAX_TERMINALS_PER_SESSION for i in range(_MAX_TERMINALS_PER_SESSION): result = await tool.execute(