Newer
Older
navi-1 / webclient / src / composables / useWebSocket.js
import { ref, onUnmounted } from 'vue'
import { useChatStore } from '@/stores/chat.js'
import { useToast } from 'gnexus-ui-kit/vue'

const WS_SCHEME = location.protocol === 'https:' ? 'wss' : 'ws'
const WS_BASE = import.meta.env.DEV
  ? `${WS_SCHEME}://${location.hostname}:8000`
  : `${WS_SCHEME}://${location.host}`

const MAX_FAST_RETRIES = 3
const FAST_RETRY_BASE_MS = 1000
const BACKGROUND_RETRY_MS = 15000
const MAX_QUEUED_SENDS = 20

function getWsUrl(sessionId) {
  const url = `${WS_BASE}/ws/sessions/${sessionId}`
  // SECURITY: token is stored in localStorage (XSS risk) and passed via query
  // param (visible in server access logs). Future: switch to auth message.
  const token = localStorage.getItem('navi_api_token')
  if (token) {
    return `${url}?api_token=${encodeURIComponent(token)}`
  }
  return url
}

export function useWebSocket() {
  const chat = useChatStore()
  const toast = useToast()

  const connected = ref(false)
  const reconnecting = ref(false)
  const reconnectFailed = ref(false)  // true after MAX_FAST_RETRIES exhausted

  let ws = null
  let sessionId = null
  let retryCount = 0
  let retryTimer = null
  let destroyed = false
  let _isReconnect = false  // true when this open is a retry, not the first connect
  let _isConnecting = false // guard against redundant _connect() calls
  let queuedSends = []

  function connect(id) {
    destroyed = false
    sessionId = id
    retryCount = 0
    reconnectFailed.value = false
    queuedSends = []
    _connect()
  }

  function disconnect() {
    destroyed = true
    clearTimeout(retryTimer)
    ws?.close()
    ws = null
    queuedSends = []
    connected.value = false
    reconnecting.value = false
  }

  function send(payload) {
    if (ws?.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify(payload))
      return true
    }
    if (queuedSends.length >= MAX_QUEUED_SENDS) queuedSends.shift()
    queuedSends.push(payload)
    reconnecting.value = true
    if (!ws || ws.readyState === WebSocket.CLOSED || ws.readyState === WebSocket.CLOSING) {
      _connect()
    }
    return false
  }

  function _connect() {
    if (destroyed || _isConnecting) return
    _isConnecting = true
    if (ws) {
      ws.onclose = null  // Prevent old socket's onclose from triggering reconnect loop
      ws.close()
    }
    ws = new WebSocket(getWsUrl(sessionId))

    ws.onopen = () => {
      _isConnecting = false
      connected.value = true
      reconnecting.value = false
      reconnectFailed.value = false
      _isReconnect = retryCount > 0
      retryCount = 0
      _flushQueuedSends()
    }

    ws.onmessage = (e) => {
      let event
      try { event = JSON.parse(e.data) } catch { return }
      _dispatch(event)
    }

    ws.onclose = () => {
      _isConnecting = false
      connected.value = false
      if (!destroyed) _scheduleReconnect()
    }

    ws.onerror = () => {
      // onclose fires after onerror, no extra handling needed
    }
  }

  function _flushQueuedSends() {
    if (!ws || ws.readyState !== WebSocket.OPEN || !queuedSends.length) return
    const pending = queuedSends
    queuedSends = []
    for (const payload of pending) {
      ws.send(JSON.stringify(payload))
    }
  }

  function _scheduleReconnect() {
    if (destroyed) return
    reconnecting.value = true
    retryCount++

    if (retryCount <= MAX_FAST_RETRIES) {
      const delay = FAST_RETRY_BASE_MS * Math.pow(2, retryCount - 1)
      retryTimer = setTimeout(_connect, delay)
    } else {
      // Switch to background retry
      reconnectFailed.value = true
      reconnecting.value = false
      retryTimer = setTimeout(() => {
        if (!destroyed) {
          reconnecting.value = true
          _connect()
        }
      }, BACKGROUND_RETRY_MS)
    }
  }

  function _dispatch(event) {
    switch (event.type) {
      case 'stream_start':      chat.onStreamStart(); break
      case 'thinking_delta':    chat.onThinkingDelta(event.delta ?? ''); break
      case 'thinking_end':      chat.onThinkingEnd(); break
      case 'turn_thinking':     chat.onTurnThinking(event); break
      case 'planning_status':   chat.onPlanningStatus(event); break
      case 'plan_ready':        chat.onPlanReady(event); break
      case 'tool_started':      chat.onToolStarted(event); break
      case 'tool_call':         chat.onToolCall(event); break
      case 'terminal_output':    chat.onTerminalOutput(event); break
      case 'terminal_closed':  chat.onTerminalClosed(event); break
      case 'stream_delta':      chat.onStreamDelta(event.delta ?? ''); break
      case 'stream_end':        chat.onStreamEnd(event); break
      case 'stream_stopped':    chat.onStreamStopped(); break
      // ui_component is delivered via tool_call metadata, not a separate WS event.
      case 'profile_switched':  chat.onProfileSwitched(event); break
      case 'compression_started':chat.onCompressionStarted(event); break
      case 'context_compressed':chat.onContextCompressed(event); break
      case 'error':             chat.onError(event); break
      case 'replay_start':      chat.onReplayStart(); break
      case 'replay_end':        chat.onReplayEnd(); break
      case 'heartbeat':         break  // keep-alive ping, no action needed
      case 'recall_update':
        chat.onRecallUpdate(event)
        break
      case 'session_sync':
        // Reload session history whenever the server signals persistence.
        // This handles both reconnect-after-finish and edge cases where
        // the client missed the final stream_end event.
        // Skip while a stream is active — reloadSession would orphan the
        // live streaming message and break delta/tool rendering.
        if (sessionId && !chat.streaming) {
          chat.reloadSession(sessionId)
        }
        break
      case 'mcp_status_update':
        if (event.status === 'connected') {
          toast.success({
            title: 'MCP server connected',
            text: `${event.server_name} (${event.tool_count ?? '?'} tools)`,
          })
        } else {
          toast.error({
            title: 'MCP server disconnected',
            text: event.server_name,
          })
        }
        break
    }
  }

  onUnmounted(disconnect)

  return { connected, reconnecting, reconnectFailed, connect, disconnect, send }
}