Newer
Older
navi-1 / clients / terminal / tui / ws_bridge.py
"""Bridge between NaviWebSocketClient and Textual event loop."""

from __future__ import annotations

import asyncio
import json

from textual.app import App

from clients.terminal.tui.events import ConnectionStatusChanged, WsEvent
from clients.terminal.ws_client import NaviWebSocketClient


class WsBridge:
    """Wraps NaviWebSocketClient and forwards events to a Textual App."""

    def __init__(self, app: App, session_id: str) -> None:
        self.app = app
        self.session_id = session_id
        self._client = NaviWebSocketClient(session_id)
        self._receive_task: asyncio.Task | None = None
        self._connected = False

    @property
    def client(self) -> NaviWebSocketClient:
        return self._client

    @property
    def connected(self) -> bool:
        return self._connected

    async def start(self) -> None:
        try:
            await self._client.connect()
            self._connected = True
            self._notify(True, "")
            self._receive_task = asyncio.create_task(self._receive_loop())
        except Exception as exc:
            self._connected = False
            self._notify(False, str(exc))

    async def stop(self) -> None:
        if self._receive_task:
            self._receive_task.cancel()
            try:
                await self._receive_task
            except asyncio.CancelledError:
                pass
            self._receive_task = None
        await self._client.close()
        self._connected = False
        self._notify(False, "")

    async def _receive_loop(self) -> None:
        try:
            ws = self._client._ws
            if ws is None:
                return
            async for raw in ws:
                try:
                    msg = json.loads(raw)
                except json.JSONDecodeError:
                    continue
                self._post(WsEvent(msg))
        except Exception:
            self._connected = False
            self._notify(False, "disconnected")

    def _post(self, message) -> None:
        self.app.post_message(message)

    def _notify(self, connected: bool, detail: str) -> None:
        self._post(ConnectionStatusChanged(connected, detail))