summaryrefslogtreecommitdiff
path: root/src/data/websocket-source.js
diff options
context:
space:
mode:
authorgrothedev <grothedev@gmail.com>2026-05-29 21:49:20 -0400
committergrothedev <grothedev@gmail.com>2026-05-29 21:49:20 -0400
commit6196004b51a6850909c154f5402ff4858eab479a (patch)
tree126b8bb1600d0a656e0df016e25d08c390f3540e /src/data/websocket-source.js
parent27dc5849c3eaf4824d79938e7077abdbe2c82e24 (diff)
mv web stuff to root project dirHEADprototypeframeworkmain
Diffstat (limited to 'src/data/websocket-source.js')
-rw-r--r--src/data/websocket-source.js224
1 files changed, 224 insertions, 0 deletions
diff --git a/src/data/websocket-source.js b/src/data/websocket-source.js
new file mode 100644
index 0000000..5458fb9
--- /dev/null
+++ b/src/data/websocket-source.js
@@ -0,0 +1,224 @@
+import { BaseSource } from './base-source.js';
+
+function clamp(value, min, max) {
+ return Math.min(max, Math.max(min, value));
+}
+
+function isFiniteNumber(value) {
+ return typeof value === 'number' && Number.isFinite(value);
+}
+
+function parsePayload(payload) {
+ if (Array.isArray(payload)) {
+ return payload.flatMap((item) => parsePayload(item));
+ }
+
+ if (isFiniteNumber(payload)) {
+ return [{ value: payload, timestampMs: null }];
+ }
+
+ if (typeof payload === 'string') {
+ const trimmed = payload.trim();
+ if (!trimmed) {
+ return [];
+ }
+
+ const numeric = Number(trimmed);
+ if (Number.isFinite(numeric)) {
+ return [{ value: numeric, timestampMs: null }];
+ }
+
+ try {
+ return parsePayload(JSON.parse(trimmed));
+ } catch {
+ return [];
+ }
+ }
+
+ if (payload && typeof payload === 'object') {
+ const candidateValue = [payload.value, payload.y, payload.signal, payload.data]
+ .find((value) => Number.isFinite(Number(value)));
+
+ if (candidateValue === undefined) {
+ return [];
+ }
+
+ const candidateTimestamp = [payload.timeMs, payload.timestampMs, payload.timestamp, payload.t]
+ .find((value) => Number.isFinite(Number(value)));
+
+ return [{
+ value: Number(candidateValue),
+ timestampMs: candidateTimestamp === undefined ? null : Number(candidateTimestamp),
+ }];
+ }
+
+ return [];
+}
+
+export class WebSocketSource extends BaseSource {
+ constructor(config = {}, { onStatusChange } = {}) {
+ super({
+ wsUrl: 'ws://localhost:8080',
+ wsReconnectMs: 2000,
+ ...config,
+ });
+ this.sourceType = 'websocket';
+ this.onStatusChange = onStatusChange;
+ this.socket = null;
+ this.queue = [];
+ this.lastPlotTimeMs = 0;
+ this.reconnectTimer = null;
+ this.shouldReconnect = false;
+ this.firstSourceTimestampMs = null;
+ this.basePlotTimeMs = 0;
+ }
+
+ start(startTimeMs = 0) {
+ super.start();
+ this.lastPlotTimeMs = startTimeMs;
+ this.basePlotTimeMs = startTimeMs;
+ this.shouldReconnect = true;
+ this.connect();
+ }
+
+ stop() {
+ super.stop();
+ this.shouldReconnect = false;
+ this.clearReconnectTimer();
+ if (this.socket) {
+ this.socket.close();
+ this.socket = null;
+ }
+ this.setStatus('disconnected', 'socket closed');
+ }
+
+ reset(startTimeMs = 0) {
+ this.queue = [];
+ this.lastPlotTimeMs = startTimeMs;
+ this.basePlotTimeMs = startTimeMs;
+ this.firstSourceTimestampMs = null;
+ }
+
+ updateConfig(nextConfig) {
+ const previousUrl = this.config.wsUrl;
+ const previousReconnectMs = this.config.wsReconnectMs;
+ super.updateConfig(nextConfig);
+
+ if ((previousUrl !== this.config.wsUrl || previousReconnectMs !== this.config.wsReconnectMs) && this.running) {
+ this.reconnect();
+ }
+ }
+
+ update(currentPlotTimeMs) {
+ this.lastPlotTimeMs = currentPlotTimeMs;
+
+ if (this.queue.length === 0) {
+ return [];
+ }
+
+ const points = [];
+ while (this.queue.length > 0) {
+ const nextPoint = this.queue.shift();
+ let timeMs = currentPlotTimeMs;
+
+ if (isFiniteNumber(nextPoint.timestampMs)) {
+ if (this.firstSourceTimestampMs === null) {
+ this.firstSourceTimestampMs = nextPoint.timestampMs;
+ this.basePlotTimeMs = currentPlotTimeMs;
+ }
+ timeMs = this.basePlotTimeMs + (nextPoint.timestampMs - this.firstSourceTimestampMs);
+ }
+
+ points.push({
+ timeMs,
+ value: nextPoint.value,
+ sourceId: this.config.id ?? 'websocket',
+ });
+ }
+
+ return points;
+ }
+
+ reconnect() {
+ if (!this.running) {
+ return;
+ }
+
+ this.clearReconnectTimer();
+ if (this.socket) {
+ this.socket.close();
+ this.socket = null;
+ }
+ this.connect();
+ }
+
+ connect() {
+ const url = this.config.wsUrl?.trim();
+ if (!url) {
+ this.setStatus('idle', 'enter a websocket url');
+ return;
+ }
+
+ this.clearReconnectTimer();
+ this.setStatus('connecting', url);
+
+ try {
+ this.socket = new WebSocket(url);
+ } catch (error) {
+ this.setStatus('error', error instanceof Error ? error.message : String(error));
+ this.scheduleReconnect();
+ return;
+ }
+
+ this.socket.addEventListener('open', () => {
+ this.setStatus('connected', url);
+ });
+
+ this.socket.addEventListener('message', (event) => {
+ const parsedPoints = parsePayload(event.data);
+ if (parsedPoints.length === 0) {
+ return;
+ }
+ this.queue.push(...parsedPoints);
+ });
+
+ this.socket.addEventListener('error', () => {
+ this.setStatus('error', 'socket error');
+ });
+
+ this.socket.addEventListener('close', () => {
+ this.socket = null;
+ if (!this.running) {
+ return;
+ }
+ this.setStatus('disconnected', 'retrying');
+ this.scheduleReconnect();
+ });
+ }
+
+ scheduleReconnect() {
+ if (!this.shouldReconnect || !this.running) {
+ return;
+ }
+
+ const reconnectMs = clamp(Number(this.config.wsReconnectMs) || 2000, 250, 30000);
+ this.clearReconnectTimer();
+ this.reconnectTimer = window.setTimeout(() => {
+ this.connect();
+ }, reconnectMs);
+ }
+
+ clearReconnectTimer() {
+ if (this.reconnectTimer !== null) {
+ window.clearTimeout(this.reconnectTimer);
+ this.reconnectTimer = null;
+ }
+ }
+
+ setStatus(status, detail = '') {
+ this.onStatusChange?.({
+ wsStatus: status,
+ wsStatusDetail: detail,
+ });
+ }
+}