Newer
Older
navi-1 / clients / terminal / tui / tui_app.py
"""Textual TUI application for Navi Code."""

from __future__ import annotations

from textual.app import App, ComposeResult
from textual.containers import Horizontal
from textual.widgets import Footer, Header

from clients.terminal import api
from clients.terminal.config import settings
from clients.terminal.state import StateManager
from clients.terminal.tui.context import TuiContext
from clients.terminal.tui.events import (
    ConnectionStatusChanged,
    PermissionRequest,
    UserSubmitted,
    WsEvent,
)
from clients.terminal.tui.file_refs import FileRefResolver
from clients.terminal.tui.permissions import PermissionEngine, PermissionRule
from clients.terminal.tui.shell_runner import run_shell_command
from clients.terminal.tui.themes import ThemeRegistry, set_active_theme
from clients.terminal.tui.commands.registry import get_registry
from clients.terminal.tui.screens.command_palette import CommandPaletteScreen
from clients.terminal.tui.screens.permission_dialog import PermissionDialogScreen
from clients.terminal.tui.widgets import ChatPanel, InputBox, StatusPanel
from clients.terminal.tui.ws_bridge import WsBridge


class NaviCodeTui(App):
    """OpenCode-inspired terminal UI for Navi."""

    BINDINGS = [
        ("ctrl+p", "command_palette", "Palette"),
        ("ctrl+x q", "quit", "Quit"),
        ("ctrl+x n", "new_session", "New"),
        ("ctrl+x l", "list_sessions", "Sessions"),
        ("ctrl+x c", "compact", "Compact"),
        ("ctrl+x t", "toggle_thinking", "Thinking"),
    ]

    def __init__(
        self,
        session_id: str | None = None,
        profile_id: str | None = None,
        new_session: bool = False,
        theme_name: str = "gnexus-dark",
    ) -> None:
        self._theme_name = theme_name
        super().__init__()
        self._register_textual_themes()
        self.theme = self._theme_name
        set_active_theme(self._theme_name)
        self._chat_panel = ChatPanel()
        self._status_panel = StatusPanel()
        self._input_box = InputBox()
        self._state = StateManager()
        self._ctx = TuiContext(
            state=self._state,
            chat_panel=self._chat_panel,
            status_panel=self._status_panel,
        )
        self._bridge: WsBridge | None = None
        self._permission_engine = PermissionEngine()
        self._pending_permission: PermissionRequest | None = None
        self._requested_session_id = session_id
        self._requested_profile_id = profile_id
        self._force_new_session = new_session

    def compose(self) -> ComposeResult:
        yield Header(show_clock=False)
        with Horizontal():
            yield self._chat_panel
            yield self._status_panel
        yield self._input_box
        yield Footer()

    def on_mount(self) -> None:
        self._apply_theme()
        self.run_worker(self._startup)

    def _register_textual_themes(self) -> None:
        """Register every Navi theme as a Textual theme so $tui-* variables resolve."""
        for name in ThemeRegistry.all():
            self.register_theme(ThemeRegistry.get(name).to_textual_theme())

    def _apply_theme(self) -> None:
        """Activate the selected theme and update global active theme state."""
        set_active_theme(self._theme_name)
        self.theme = self._theme_name

    async def _startup(self) -> None:
        session_id = await self._resolve_session(
            self._requested_session_id,
            self._requested_profile_id,
            self._force_new_session,
        )
        if session_id:
            await self._attach_session(session_id)
        self._input_box.focus_input()

    async def _resolve_session(
        self,
        session_id: str | None,
        profile_id: str | None,
        force_new: bool,
    ) -> str | None:
        if session_id and not force_new:
            try:
                session = api.get_session(session_id)
                return session["session_id"]
            except Exception:
                pass

        if not force_new:
            saved = self._state.get_session_id()
            if saved:
                try:
                    session = api.get_session(saved)
                    return session["session_id"]
                except Exception:
                    self._state.clear_session_id()

        profile = profile_id or settings.default_profile_id
        try:
            session = api.create_session(profile)
        except Exception as exc:
            self._chat_panel.handle_ws_event({"type": "error", "message": f"Failed to create session: {exc}"})
            return None
        self._state.set_session_id(session["session_id"])
        return session["session_id"]

    async def _attach_session(self, session_id: str) -> None:
        self._ctx.session_id = session_id
        try:
            session = api.get_session(session_id)
            self._ctx.profile_id = session.get("profile_id") or settings.default_profile_id
        except Exception:
            self._ctx.profile_id = settings.default_profile_id

        self._status_panel.set_session(session_id)
        self._status_panel.set_profile(self._ctx.profile_id)
        self._status_panel.set_model(settings.ollama_default_model if hasattr(settings, "ollama_default_model") else "unknown")

        if self._bridge:
            await self._bridge.stop()
        self._bridge = WsBridge(self, session_id)
        await self._bridge.start()
        self._ctx.ws_client = self._bridge.client
        self._chat_panel.handle_ws_event({"type": "status", "content": f"Connected to {session_id[:8]}"})

    def on_user_submitted(self, event: UserSubmitted) -> None:
        text = event.text
        if text.startswith("/"):
            self._run_command(text)
            return

        if text.startswith("!"):
            self._run_shell_command(text)
            return

        resolved = FileRefResolver().resolve(text)
        self._chat_panel.add_user_message(resolved.prompt)
        if resolved.attachments:
            names = ", ".join(a.display_path + (" (truncated)" if a.truncated else "") for a in resolved.attachments)
            self._chat_panel.handle_ws_event({"type": "status", "content": f"Attached: {names}"})
        for err in resolved.errors:
            self._chat_panel.handle_ws_event({"type": "error", "message": err})

        if self._bridge and self._bridge.connected:
            self._bridge.client.enqueue(resolved.to_message())
        else:
            self._chat_panel.handle_ws_event({"type": "error", "message": "Not connected to a session"})

    def _run_shell_command(self, text: str) -> None:
        command = text[1:].strip()
        args = {"action": "run", "command": command}
        if self._permission_engine.is_always_deny("shell", args):
            self._chat_panel.handle_ws_event({"type": "error", "message": f"Shell command denied by policy: {command}"})
            return
        if self._permission_engine.check("shell", args) is None:
            self.run_worker(self._shell_worker(text))
            return
        self._confirm_shell_command(text)

    def _confirm_shell_command(self, text: str) -> None:
        command = text[1:].strip()

        def on_decision(choice: str | None) -> None:
            if choice == "allow_once":
                self.run_worker(self._shell_worker(text))
            elif choice == "allow_always":
                self._permission_engine.set_always_allow("shell", {"action": "run", "command": command})
                self.run_worker(self._shell_worker(text))
            elif choice == "deny_once":
                self._chat_panel.handle_ws_event({"type": "error", "message": f"Shell command cancelled: {command}"})
            elif choice == "deny_always":
                self._permission_engine.set_always_deny("shell", {"action": "run", "command": command})
                self._chat_panel.handle_ws_event({"type": "error", "message": f"Shell command cancelled: {command}"})
            else:
                self._chat_panel.handle_ws_event({"type": "error", "message": f"Shell command cancelled: {command}"})

        self.push_screen(
            PermissionDialogScreen(
                tool="shell",
                action="run",
                target=command,
                details="Local shell command",
            ),
            callback=on_decision,
        )

    async def _shell_worker(self, text: str) -> None:
        result = run_shell_command(text)
        self._chat_panel.handle_ws_event({"type": "status", "content": result.summary()})

    def _run_command(self, text: str) -> None:
        parts = text[1:].split(None, 1)
        name = parts[0].lower()
        args = parts[1] if len(parts) > 1 else ""
        registry = get_registry()
        cmd = registry.get(name)
        if cmd is None:
            self._chat_panel.handle_ws_event({"type": "error", "message": f"Unknown command: /{name}"})
            return
        self.run_worker(self._command_worker(cmd, args))

    async def _command_worker(self, cmd, args: str) -> None:
        await cmd.execute(self._ctx, args)

    def on_ws_event(self, event: WsEvent) -> None:
        payload = event.payload
        if payload.get("type") == "tool_started":
            tool = payload.get("tool", "")
            args = payload.get("args") or {}
            if self._permission_engine.is_always_deny(tool, args):
                self._deny_tool(tool, args)
                return
            rule = self._permission_engine.check(tool, args)
            if rule is not None:
                self._show_permission_dialog(payload, rule)
                return
        self._chat_panel.handle_ws_event(payload)

    def _show_permission_dialog(self, payload: dict, rule) -> None:
        tool = payload.get("tool", "?")
        args = payload.get("args") or {}
        action = args.get("action", "")
        target = self._permission_engine.extract_target(tool, args)

        def on_decision(choice: str | None) -> None:
            if choice == "allow_once":
                self._chat_panel.handle_ws_event(payload)
            elif choice == "allow_always":
                self._permission_engine.set_always_allow(tool, args)
                self._chat_panel.handle_ws_event(payload)
            elif choice == "deny_once":
                self._deny_tool(tool, args)
            elif choice == "deny_always":
                self._permission_engine.set_always_deny(tool, args)
                self._deny_tool(tool, args)
            else:
                # Dismissed / escape — treat as deny once.
                self._deny_tool(tool, args)

        self.push_screen(
            PermissionDialogScreen(
                tool=tool,
                action=action,
                target=target,
                details=rule.message,
            ),
            callback=on_decision,
        )

    def _deny_tool(self, tool: str, args: dict) -> None:
        # Render a synthetic tool result so the user sees the denial, then stop
        # the session because the backend is already executing the tool and the
        # TUI cannot inject a result into the running tool-call loop.
        self._chat_panel.handle_ws_event(
            {
                "type": "tool_call",
                "tool": tool,
                "args": args,
                "success": False,
                "result": "permission denied by user",
            }
        )
        if self._ctx.session_id:
            self.run_worker(self._stop_session_worker(self._ctx.session_id))

    async def _stop_session_worker(self, session_id: str) -> None:
        try:
            api.stop_session(session_id)
        except Exception as exc:
            self._chat_panel.handle_ws_event({"type": "error", "message": f"Failed to stop session: {exc}"})
        if self._bridge:
            await self._bridge.stop()
        self._status_panel.set_connection(False, "permission denied")

    def on_connection_status_changed(self, event: ConnectionStatusChanged) -> None:
        self._status_panel.set_connection(event.connected, event.detail)

    def on_permission_request(self, event: PermissionRequest) -> None:
        """Manual PermissionRequest event (fallback from components)."""
        tool = event.tool
        args = event.args
        payload = {"type": "tool_started", "tool": tool, "args": args}
        rule = PermissionRule(tool=tool, message=event.message)
        self._show_permission_dialog(payload, rule)

    def action_command_palette(self) -> None:
        registry = get_registry()

        def on_select(cmd) -> None:
            if cmd is not None:
                self.run_worker(self._command_worker(cmd, ""))

        self.push_screen(CommandPaletteScreen(registry.all()), callback=on_select)

    def action_new_session(self) -> None:
        self._run_command("/new")

    def action_list_sessions(self) -> None:
        self._run_command("/sessions")

    def action_compact(self) -> None:
        self._run_command("/compact")

    def action_toggle_thinking(self) -> None:
        self._run_command("/thinking")

    async def action_quit(self) -> None:
        if self._bridge:
            await self._bridge.stop()
        self.exit()

    async def on_unmount(self) -> None:
        if self._bridge:
            await self._bridge.stop()