"""Bridge between NaviWebSocketClient and Textual event loop."""
from __future__ import annotations
import asyncio
import json
from textual.app import App
from clients.terminal.tui.events import ConnectionStatusChanged, WsEvent
from clients.terminal.ws_client import NaviWebSocketClient
class WsBridge:
"""Wraps NaviWebSocketClient and forwards events to a Textual App."""
def __init__(self, app: App, session_id: str) -> None:
self.app = app
self.session_id = session_id
self._client = NaviWebSocketClient(session_id)
self._receive_task: asyncio.Task | None = None
self._connected = False
@property
def client(self) -> NaviWebSocketClient:
return self._client
@property
def connected(self) -> bool:
return self._connected
async def start(self) -> None:
try:
await self._client.connect()
self._connected = True
self._notify(True, "")
self._receive_task = asyncio.create_task(self._receive_loop())
except Exception as exc:
self._connected = False
self._notify(False, str(exc))
async def stop(self) -> None:
if self._receive_task:
self._receive_task.cancel()
try:
await self._receive_task
except asyncio.CancelledError:
pass
self._receive_task = None
await self._client.close()
self._connected = False
self._notify(False, "")
async def _receive_loop(self) -> None:
try:
ws = self._client._ws
if ws is None:
return
async for raw in ws:
try:
msg = json.loads(raw)
except json.JSONDecodeError:
continue
self._post(WsEvent(msg))
except Exception:
self._connected = False
self._notify(False, "disconnected")
def _post(self, message) -> None:
self.app.post_message(message)
def _notify(self, connected: bool, detail: str) -> None:
self._post(ConnectionStatusChanged(connected, detail))