Newer
Older
navi-1 / navi / tools / terminal_manager.py
"""Persistent terminal session manager.

Owns named subprocess terminals keyed by (session_id, terminal_name).
Supports background long-running processes, streaming output via the
agent event sink, and automatic idle cleanup.
"""

from __future__ import annotations

import asyncio
import dataclasses
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any

import structlog

if TYPE_CHECKING:
    from navi.core.events import AgentEvent

log = structlog.get_logger()

_MAX_OUTPUT_BUFFER = 500  # lines kept per terminal for status queries
_DEFAULT_CLEANUP_INTERVAL = 60  # seconds between idle checks
_DEFAULT_MAX_IDLE = 1800  # 30 minutes


@dataclass
class TerminalSession:
    """A single named persistent terminal subprocess."""

    name: str
    description: str
    session_id: str
    command: str
    cwd: Path | None = None
    env: dict[str, str] | None = None
    background: bool = False
    proc: asyncio.subprocess.Process | None = None
    stdin: asyncio.StreamWriter | None = None
    stdout_task: asyncio.Task | None = None
    stderr_task: asyncio.Task | None = None
    output_buffer: deque[str] = field(default_factory=lambda: deque(maxlen=_MAX_OUTPUT_BUFFER))
    created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    last_active: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    closed: bool = False
    close_reason: str | None = None

    def touch(self) -> None:
        self.last_active = datetime.now(timezone.utc)

    def is_alive(self) -> bool:
        if self.closed:
            return False
        if self.proc is None:
            return False
        return self.proc.returncode is None

    def summary(self) -> dict[str, Any]:
        return {
            "name": self.name,
            "description": self.description,
            "command": self.command,
            "status": "busy" if self.is_alive() else "idle",
            "pid": self.proc.pid if self.proc else None,
            "cwd": str(self.cwd) if self.cwd else None,
            "background": self.background,
            "created_at": self.created_at.isoformat(),
            "last_active": self.last_active.isoformat(),
            "uptime_seconds": round((datetime.now(timezone.utc) - self.created_at).total_seconds(), 1),
        }

    def status(self) -> dict[str, Any]:
        out = self.summary()
        out["output_tail"] = list(self.output_buffer)[-20:]
        out["returncode"] = self.proc.returncode if self.proc else None
        out["closed"] = self.closed
        out["close_reason"] = self.close_reason
        return out


class TerminalManager:
    """Owns all persistent terminal sessions across all Navi sessions."""

    def __init__(self, max_idle_seconds: int = _DEFAULT_MAX_IDLE) -> None:
        self._sessions: dict[tuple[str, str], TerminalSession] = {}
        self._max_idle = max_idle_seconds
        self._cleanup_task: asyncio.Task | None = None

    # ── Lifecycle ────────────────────────────────────────────────────────────

    def start(self) -> None:
        if self._cleanup_task is None or self._cleanup_task.done():
            self._cleanup_task = asyncio.create_task(self._cleanup_loop())

    async def shutdown(self) -> None:
        if self._cleanup_task and not self._cleanup_task.done():
            self._cleanup_task.cancel()
            try:
                await self._cleanup_task
            except asyncio.CancelledError:
                pass
        for key in list(self._sessions):
            await self._close_one(key, reason="shutdown")

    async def _cleanup_loop(self) -> None:
        while True:
            try:
                await asyncio.sleep(_DEFAULT_CLEANUP_INTERVAL)
                self.cleanup_idle()
            except asyncio.CancelledError:
                break
            except Exception:
                log.exception("terminal_manager.cleanup_loop_error")

    # ── Public API ───────────────────────────────────────────────────────────

    def list(self, session_id: str) -> list[dict[str, Any]]:
        return [
            session.summary()
            for (sid, name), session in self._sessions.items()
            if sid == session_id and not session.closed
        ]

    def status(self, session_id: str, name: str) -> dict[str, Any] | None:
        session = self._sessions.get((session_id, name))
        if session is None or session.closed:
            return None
        session.touch()
        return session.status()

    async def open(
        self,
        session_id: str,
        name: str,
        description: str,
        command: str,
        background: bool = False,
        cwd: Path | None = None,
        env: dict[str, str] | None = None,
        timeout: int = 20,
        event_sink: asyncio.Queue | None = None,
    ) -> TerminalSession:
        """Open a new persistent terminal session.

        If *background* is True the process is started and the coroutine
        returns immediately — output is streamed via *event_sink*.
        If False, it waits for the process to finish and returns.
        """
        key = (session_id, name)
        if key in self._sessions and not self._sessions[key].closed:
            raise ValueError(f"Terminal '{name}' already exists for session {session_id}")

        session = TerminalSession(
            name=name,
            description=description,
            session_id=session_id,
            command=command,
            cwd=cwd,
            env=env,
            background=background,
        )
        self._sessions[key] = session

        proc = await asyncio.create_subprocess_shell(
            command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            stdin=asyncio.subprocess.PIPE if background else None,
            cwd=cwd,
            env=env,
        )
        session.proc = proc
        session.stdin = proc.stdin

        # Start background readers
        session.stdout_task = asyncio.create_task(
            self._read_stream(proc.stdout, session, "stdout", event_sink),
            name=f"term-{session_id}-{name}-stdout",
        )
        session.stderr_task = asyncio.create_task(
            self._read_stream(proc.stderr, session, "stderr", event_sink),
            name=f"term-{session_id}-{name}-stderr",
        )

        if not background:
            # Wait for completion, gather output
            try:
                await asyncio.wait_for(proc.wait(), timeout=timeout)
            except asyncio.TimeoutError:
                proc.kill()
                await proc.wait()
                session.close_reason = "timeout"
            # Foreground terminal stays in the manager so status/list work;
            # idle cleanup will eventually close it.

        session.touch()
        return session

    async def send_input(self, session_id: str, name: str, text: str) -> bool:
        session = self._sessions.get((session_id, name))
        if session is None or session.closed or session.stdin is None:
            return False
        try:
            session.stdin.write(text.encode())
            await session.stdin.drain()
            session.touch()
            return True
        except Exception:
            log.exception("terminal_manager.send_input_error", session_id=session_id, name=name)
            return False

    async def close(self, session_id: str, name: str) -> bool:
        key = (session_id, name)
        if key not in self._sessions:
            return False
        await self._close_one(key, reason="explicit")
        return True

    async def close_all(self, session_id: str) -> int:
        """Close every terminal belonging to *session_id*. Returns count closed."""
        keys = [(sid, name) for (sid, name) in list(self._sessions) if sid == session_id]
        for key in keys:
            await self._close_one(key, reason="session_ended")
        return len(keys)

    def cleanup_idle(self) -> None:
        now = datetime.now(timezone.utc)
        stale = [
            key
            for key, session in list(self._sessions.items())
            if not session.closed and (now - session.last_active).total_seconds() > self._max_idle
        ]
        for key in stale:
            asyncio.create_task(self._close_one(key, reason="idle_timeout"))

    # ── Internals ────────────────────────────────────────────────────────────

    async def _read_stream(
        self,
        stream: asyncio.StreamReader | None,
        session: TerminalSession,
        stream_name: str,
        event_sink: asyncio.Queue | None,
    ) -> None:
        if stream is None:
            return
        try:
            while True:
                line = await stream.readline()
                if not line:
                    break
                text = line.decode(errors="replace")
                session.output_buffer.append(text)
                session.touch()
                if event_sink is not None:
                    try:
                        from navi.core.events import TerminalOutputDelta

                        await event_sink.put(
                            TerminalOutputDelta(
                                terminal_name=session.name,
                                stream=stream_name,
                                delta=text,
                            )
                        )
                    except Exception:
                        log.exception("terminal_manager.sink_put_error")
        except asyncio.CancelledError:
            pass
        except Exception:
            log.exception("terminal_manager.read_stream_error")

    async def _cancel_readers(self, session: TerminalSession) -> None:
        for task in (session.stdout_task, session.stderr_task):
            if task and not task.done():
                task.cancel()
                try:
                    await task
                except asyncio.CancelledError:
                    pass

    async def _close_one(self, key: tuple[str, str], reason: str) -> None:
        session = self._sessions.get(key)
        if session is None:
            return
        if session.closed:
            return
        session.closed = True
        session.close_reason = reason

        if session.proc and session.proc.returncode is None:
            try:
                session.proc.kill()
                await asyncio.wait_for(session.proc.wait(), timeout=5.0)
            except asyncio.TimeoutError:
                session.proc.terminate()
                try:
                    await asyncio.wait_for(session.proc.wait(), timeout=2.0)
                except asyncio.TimeoutError:
                    pass
            except Exception:
                log.exception("terminal_manager.proc_kill_error", key=key)

        await self._cancel_readers(session)

        if session.stdin:
            try:
                session.stdin.close()
                await session.stdin.wait_closed()
            except Exception:
                pass

        log.info("terminal_manager.closed", session_id=key[0], name=key[1], reason=reason)