"""Persistent terminal session manager.
Owns named subprocess terminals keyed by (session_id, terminal_name).
Supports background long-running processes, streaming output via the
agent event sink, and automatic idle cleanup.
"""
from __future__ import annotations
import asyncio
import dataclasses
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any
import structlog
if TYPE_CHECKING:
from navi.core.events import AgentEvent
log = structlog.get_logger()
_MAX_OUTPUT_BUFFER = 500 # lines kept per terminal for status queries
_DEFAULT_CLEANUP_INTERVAL = 60 # seconds between idle checks
_DEFAULT_MAX_IDLE = 1800 # 30 minutes
@dataclass
class TerminalSession:
"""A single named persistent terminal subprocess."""
name: str
description: str
session_id: str
command: str
cwd: Path | None = None
env: dict[str, str] | None = None
background: bool = False
proc: asyncio.subprocess.Process | None = None
stdin: asyncio.StreamWriter | None = None
stdout_task: asyncio.Task | None = None
stderr_task: asyncio.Task | None = None
output_buffer: deque[str] = field(default_factory=lambda: deque(maxlen=_MAX_OUTPUT_BUFFER))
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
last_active: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
closed: bool = False
close_reason: str | None = None
def touch(self) -> None:
self.last_active = datetime.now(timezone.utc)
def is_alive(self) -> bool:
if self.closed:
return False
if self.proc is None:
return False
return self.proc.returncode is None
def summary(self) -> dict[str, Any]:
return {
"name": self.name,
"description": self.description,
"command": self.command,
"status": "busy" if self.is_alive() else "idle",
"pid": self.proc.pid if self.proc else None,
"cwd": str(self.cwd) if self.cwd else None,
"background": self.background,
"created_at": self.created_at.isoformat(),
"last_active": self.last_active.isoformat(),
"uptime_seconds": round((datetime.now(timezone.utc) - self.created_at).total_seconds(), 1),
}
def status(self) -> dict[str, Any]:
out = self.summary()
out["output_tail"] = list(self.output_buffer)[-20:]
out["returncode"] = self.proc.returncode if self.proc else None
out["closed"] = self.closed
out["close_reason"] = self.close_reason
return out
class TerminalManager:
"""Owns all persistent terminal sessions across all Navi sessions."""
def __init__(self, max_idle_seconds: int = _DEFAULT_MAX_IDLE) -> None:
self._sessions: dict[tuple[str, str], TerminalSession] = {}
self._max_idle = max_idle_seconds
self._cleanup_task: asyncio.Task | None = None
# ── Lifecycle ────────────────────────────────────────────────────────────
def start(self) -> None:
if self._cleanup_task is None or self._cleanup_task.done():
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
async def shutdown(self) -> None:
if self._cleanup_task and not self._cleanup_task.done():
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
for key in list(self._sessions):
await self._close_one(key, reason="shutdown")
async def _cleanup_loop(self) -> None:
while True:
try:
await asyncio.sleep(_DEFAULT_CLEANUP_INTERVAL)
self.cleanup_idle()
except asyncio.CancelledError:
break
except Exception:
log.exception("terminal_manager.cleanup_loop_error")
# ── Public API ───────────────────────────────────────────────────────────
def list(self, session_id: str) -> list[dict[str, Any]]:
return [
session.summary()
for (sid, name), session in self._sessions.items()
if sid == session_id and not session.closed
]
def status(self, session_id: str, name: str) -> dict[str, Any] | None:
session = self._sessions.get((session_id, name))
if session is None or session.closed:
return None
session.touch()
return session.status()
async def open(
self,
session_id: str,
name: str,
description: str,
command: str,
background: bool = False,
cwd: Path | None = None,
env: dict[str, str] | None = None,
timeout: int = 20,
event_sink: asyncio.Queue | None = None,
) -> TerminalSession:
"""Open a new persistent terminal session.
If *background* is True the process is started and the coroutine
returns immediately — output is streamed via *event_sink*.
If False, it waits for the process to finish and returns.
"""
key = (session_id, name)
if key in self._sessions and not self._sessions[key].closed:
raise ValueError(f"Terminal '{name}' already exists for session {session_id}")
session = TerminalSession(
name=name,
description=description,
session_id=session_id,
command=command,
cwd=cwd,
env=env,
background=background,
)
self._sessions[key] = session
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE if background else None,
cwd=cwd,
env=env,
)
session.proc = proc
session.stdin = proc.stdin
# Start background readers
session.stdout_task = asyncio.create_task(
self._read_stream(proc.stdout, session, "stdout", event_sink),
name=f"term-{session_id}-{name}-stdout",
)
session.stderr_task = asyncio.create_task(
self._read_stream(proc.stderr, session, "stderr", event_sink),
name=f"term-{session_id}-{name}-stderr",
)
if not background:
# Wait for completion, gather output
try:
await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
session.close_reason = "timeout"
# Foreground terminal stays in the manager so status/list work;
# idle cleanup will eventually close it.
session.touch()
return session
async def send_input(self, session_id: str, name: str, text: str) -> bool:
session = self._sessions.get((session_id, name))
if session is None or session.closed or session.stdin is None:
return False
try:
session.stdin.write(text.encode())
await session.stdin.drain()
session.touch()
return True
except Exception:
log.exception("terminal_manager.send_input_error", session_id=session_id, name=name)
return False
async def close(self, session_id: str, name: str) -> bool:
key = (session_id, name)
if key not in self._sessions:
return False
await self._close_one(key, reason="explicit")
return True
async def close_all(self, session_id: str) -> int:
"""Close every terminal belonging to *session_id*. Returns count closed."""
keys = [(sid, name) for (sid, name) in list(self._sessions) if sid == session_id]
for key in keys:
await self._close_one(key, reason="session_ended")
return len(keys)
def cleanup_idle(self) -> None:
now = datetime.now(timezone.utc)
stale = [
key
for key, session in list(self._sessions.items())
if not session.closed and (now - session.last_active).total_seconds() > self._max_idle
]
for key in stale:
asyncio.create_task(self._close_one(key, reason="idle_timeout"))
# ── Internals ────────────────────────────────────────────────────────────
async def _read_stream(
self,
stream: asyncio.StreamReader | None,
session: TerminalSession,
stream_name: str,
event_sink: asyncio.Queue | None,
) -> None:
if stream is None:
return
try:
while True:
line = await stream.readline()
if not line:
break
text = line.decode(errors="replace")
session.output_buffer.append(text)
session.touch()
if event_sink is not None:
try:
from navi.core.events import TerminalOutputDelta
await event_sink.put(
TerminalOutputDelta(
terminal_name=session.name,
stream=stream_name,
delta=text,
)
)
except Exception:
log.exception("terminal_manager.sink_put_error")
except asyncio.CancelledError:
pass
except Exception:
log.exception("terminal_manager.read_stream_error")
async def _cancel_readers(self, session: TerminalSession) -> None:
for task in (session.stdout_task, session.stderr_task):
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def _close_one(self, key: tuple[str, str], reason: str) -> None:
session = self._sessions.get(key)
if session is None:
return
if session.closed:
return
session.closed = True
session.close_reason = reason
if session.proc and session.proc.returncode is None:
try:
session.proc.kill()
await asyncio.wait_for(session.proc.wait(), timeout=5.0)
except asyncio.TimeoutError:
session.proc.terminate()
try:
await asyncio.wait_for(session.proc.wait(), timeout=2.0)
except asyncio.TimeoutError:
pass
except Exception:
log.exception("terminal_manager.proc_kill_error", key=key)
await self._cancel_readers(session)
if session.stdin:
try:
session.stdin.close()
await session.stdin.wait_closed()
except Exception:
pass
log.info("terminal_manager.closed", session_id=key[0], name=key[1], reason=reason)