import { BaseSource } from './base-source.js'; function clamp(value, min, max) { return Math.min(max, Math.max(min, value)); } function isFiniteNumber(value) { return typeof value === 'number' && Number.isFinite(value); } function parsePayload(payload) { if (Array.isArray(payload)) { return payload.flatMap((item) => parsePayload(item)); } if (isFiniteNumber(payload)) { return [{ value: payload, timestampMs: null }]; } if (typeof payload === 'string') { const trimmed = payload.trim(); if (!trimmed) { return []; } const numeric = Number(trimmed); if (Number.isFinite(numeric)) { return [{ value: numeric, timestampMs: null }]; } try { return parsePayload(JSON.parse(trimmed)); } catch { return []; } } if (payload && typeof payload === 'object') { const candidateValue = [payload.value, payload.y, payload.signal, payload.data] .find((value) => Number.isFinite(Number(value))); if (candidateValue === undefined) { return []; } const candidateTimestamp = [payload.timeMs, payload.timestampMs, payload.timestamp, payload.t] .find((value) => Number.isFinite(Number(value))); return [{ value: Number(candidateValue), timestampMs: candidateTimestamp === undefined ? null : Number(candidateTimestamp), }]; } return []; } export class WebSocketSource extends BaseSource { constructor(config = {}, { onStatusChange } = {}) { super({ wsUrl: 'ws://localhost:8080', wsReconnectMs: 2000, ...config, }); this.sourceType = 'websocket'; this.onStatusChange = onStatusChange; this.socket = null; this.queue = []; this.lastPlotTimeMs = 0; this.reconnectTimer = null; this.shouldReconnect = false; this.firstSourceTimestampMs = null; this.basePlotTimeMs = 0; } start(startTimeMs = 0) { super.start(); this.lastPlotTimeMs = startTimeMs; this.basePlotTimeMs = startTimeMs; this.shouldReconnect = true; this.connect(); } stop() { super.stop(); this.shouldReconnect = false; this.clearReconnectTimer(); if (this.socket) { this.socket.close(); this.socket = null; } this.setStatus('disconnected', 'socket closed'); } reset(startTimeMs = 0) { this.queue = []; this.lastPlotTimeMs = startTimeMs; this.basePlotTimeMs = startTimeMs; this.firstSourceTimestampMs = null; } updateConfig(nextConfig) { const previousUrl = this.config.wsUrl; const previousReconnectMs = this.config.wsReconnectMs; super.updateConfig(nextConfig); if ((previousUrl !== this.config.wsUrl || previousReconnectMs !== this.config.wsReconnectMs) && this.running) { this.reconnect(); } } update(currentPlotTimeMs) { this.lastPlotTimeMs = currentPlotTimeMs; if (this.queue.length === 0) { return []; } const points = []; while (this.queue.length > 0) { const nextPoint = this.queue.shift(); let timeMs = currentPlotTimeMs; if (isFiniteNumber(nextPoint.timestampMs)) { if (this.firstSourceTimestampMs === null) { this.firstSourceTimestampMs = nextPoint.timestampMs; this.basePlotTimeMs = currentPlotTimeMs; } timeMs = this.basePlotTimeMs + (nextPoint.timestampMs - this.firstSourceTimestampMs); } points.push({ timeMs, value: nextPoint.value, sourceId: this.config.id ?? 'websocket', }); } return points; } reconnect() { if (!this.running) { return; } this.clearReconnectTimer(); if (this.socket) { this.socket.close(); this.socket = null; } this.connect(); } connect() { const url = this.config.wsUrl?.trim(); if (!url) { this.setStatus('idle', 'enter a websocket url'); return; } this.clearReconnectTimer(); this.setStatus('connecting', url); try { this.socket = new WebSocket(url); } catch (error) { this.setStatus('error', error instanceof Error ? error.message : String(error)); this.scheduleReconnect(); return; } this.socket.addEventListener('open', () => { this.setStatus('connected', url); }); this.socket.addEventListener('message', (event) => { const parsedPoints = parsePayload(event.data); if (parsedPoints.length === 0) { return; } this.queue.push(...parsedPoints); }); this.socket.addEventListener('error', () => { this.setStatus('error', 'socket error'); }); this.socket.addEventListener('close', () => { this.socket = null; if (!this.running) { return; } this.setStatus('disconnected', 'retrying'); this.scheduleReconnect(); }); } scheduleReconnect() { if (!this.shouldReconnect || !this.running) { return; } const reconnectMs = clamp(Number(this.config.wsReconnectMs) || 2000, 250, 30000); this.clearReconnectTimer(); this.reconnectTimer = window.setTimeout(() => { this.connect(); }, reconnectMs); } clearReconnectTimer() { if (this.reconnectTimer !== null) { window.clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } } setStatus(status, detail = '') { this.onStatusChange?.({ wsStatus: status, wsStatusDetail: detail, }); } }