From 27dc5849c3eaf4824d79938e7077abdbe2c82e24 Mon Sep 17 00:00:00 2001 From: grothedev Date: Fri, 29 May 2026 21:34:16 -0400 Subject: updates from claude. need to review. archiving rust and cpp stuff, going completely TS --- web-timeplot/src/data/csv-replay-source.js | 60 +++++++ web-timeplot/src/data/parse-replay-csv.js | 108 ++++++++++++ web-timeplot/src/data/source-registry.js | 85 ++++++++-- web-timeplot/src/data/synthetic-wave-source.js | 1 + web-timeplot/src/data/websocket-source.js | 224 +++++++++++++++++++++++++ 5 files changed, 460 insertions(+), 18 deletions(-) create mode 100644 web-timeplot/src/data/csv-replay-source.js create mode 100644 web-timeplot/src/data/parse-replay-csv.js create mode 100644 web-timeplot/src/data/websocket-source.js (limited to 'web-timeplot/src/data') diff --git a/web-timeplot/src/data/csv-replay-source.js b/web-timeplot/src/data/csv-replay-source.js new file mode 100644 index 0000000..c4e6a66 --- /dev/null +++ b/web-timeplot/src/data/csv-replay-source.js @@ -0,0 +1,60 @@ +import { BaseSource } from './base-source.js'; + +function clamp(value, min, max) { + return Math.min(max, Math.max(min, value)); +} + +export class CsvReplaySource extends BaseSource { + constructor(config = {}) { + super({ + replayRate: 1, + dataset: [], + ...config, + }); + this.sourceType = 'csv-replay'; + this.nextPointIndex = 0; + } + + start(startTimeMs = 0) { + super.start(); + this.reset(startTimeMs); + } + + reset() { + this.nextPointIndex = 0; + } + + updateConfig(nextConfig) { + const datasetChanged = nextConfig.dataset !== this.config.dataset; + super.updateConfig(nextConfig); + if (datasetChanged) { + this.reset(); + } + } + + update(currentPlotTimeMs) { + if (!this.running || !Array.isArray(this.config.dataset) || this.config.dataset.length === 0) { + return []; + } + + const replayRate = clamp(this.config.replayRate ?? 1, 0.1, 8); + const targetDatasetTimeMs = currentPlotTimeMs * replayRate; + const points = []; + + while (this.nextPointIndex < this.config.dataset.length) { + const datasetPoint = this.config.dataset[this.nextPointIndex]; + if (datasetPoint.timeMs > targetDatasetTimeMs) { + break; + } + + points.push({ + timeMs: datasetPoint.timeMs / replayRate, + value: datasetPoint.value, + sourceId: this.config.id ?? 'csv-replay', + }); + this.nextPointIndex += 1; + } + + return points; + } +} diff --git a/web-timeplot/src/data/parse-replay-csv.js b/web-timeplot/src/data/parse-replay-csv.js new file mode 100644 index 0000000..b6ce97a --- /dev/null +++ b/web-timeplot/src/data/parse-replay-csv.js @@ -0,0 +1,108 @@ +function splitRow(line) { + return line.split(/[;,\t]/).map((value) => value.trim()); +} + +function isNumeric(value) { + return value !== '' && Number.isFinite(Number(value)); +} + +function detectHeader(rows) { + if (rows.length === 0) { + return { hasHeader: false, headers: [] }; + } + + const [firstRow] = rows; + const hasHeader = firstRow.some((value) => !isNumeric(value)); + return { + hasHeader, + headers: hasHeader ? firstRow.map((value) => value.toLowerCase()) : [], + }; +} + +function detectTimeScale(headers) { + const timeHeader = headers.find((header) => header.includes('time') || header.includes('timestamp')); + if (!timeHeader) { + return 1; + } + + if (timeHeader.includes('sec') && !timeHeader.includes('msec') && !timeHeader.includes('ms')) { + return 1000; + } + + return 1; +} + +function detectColumnIndexes(headers, columnCount) { + if (headers.length === 0) { + return { + timeIndex: columnCount > 1 ? 0 : -1, + valueIndex: columnCount > 1 ? 1 : 0, + }; + } + + const timeIndex = headers.findIndex((header) => header.includes('time') || header.includes('timestamp')); + const valueIndex = headers.findIndex((header) => header.includes('value') || header.includes('signal') || header.includes('y')); + + return { + timeIndex, + valueIndex: valueIndex >= 0 ? valueIndex : (headers.length > 1 ? 1 : 0), + }; +} + +export function parseReplayCsv(text, { sampleRateHz = 60 } = {}) { + const rows = text + .split(/\r?\n/) + .map((line) => line.trim()) + .filter((line) => line && !line.startsWith('#')) + .map(splitRow) + .filter((row) => row.some((value) => value !== '')); + + if (rows.length === 0) { + throw new Error('CSV file is empty'); + } + + const { hasHeader, headers } = detectHeader(rows); + const dataRows = hasHeader ? rows.slice(1) : rows; + const columnCount = rows[0].length; + const { timeIndex, valueIndex } = detectColumnIndexes(headers, columnCount); + const timeScale = detectTimeScale(headers); + const intervalMs = 1000 / Math.max(1, sampleRateHz); + + const points = dataRows + .map((row, index) => { + const rawValue = row[valueIndex]; + if (!isNumeric(rawValue)) { + return null; + } + + const parsedValue = Number(rawValue); + const parsedTime = timeIndex >= 0 && isNumeric(row[timeIndex]) + ? Number(row[timeIndex]) * timeScale + : index * intervalMs; + + return { + timeMs: parsedTime, + value: parsedValue, + }; + }) + .filter(Boolean) + .sort((left, right) => left.timeMs - right.timeMs); + + if (points.length === 0) { + throw new Error('CSV file did not contain any numeric data points'); + } + + const firstTime = points[0].timeMs; + const normalizedPoints = points.map((point) => ({ + timeMs: point.timeMs - firstTime, + value: point.value, + })); + + return { + points: normalizedPoints, + metadata: { + pointCount: normalizedPoints.length, + durationMs: normalizedPoints.at(-1)?.timeMs ?? 0, + }, + }; +} diff --git a/web-timeplot/src/data/source-registry.js b/web-timeplot/src/data/source-registry.js index 06f5895..917d06b 100644 --- a/web-timeplot/src/data/source-registry.js +++ b/web-timeplot/src/data/source-registry.js @@ -1,41 +1,90 @@ +import { CsvReplaySource } from './csv-replay-source.js'; import { SyntheticWaveSource } from './synthetic-wave-source.js'; +import { WebSocketSource } from './websocket-source.js'; export class SourceRegistry { constructor(store, bus) { this.store = store; this.bus = bus; - this.sources = new Map([ - ['synthetic-wave', new SyntheticWaveSource(store.getState().source)], - ]); - this.activeSource = this.sources.get(store.getState().source.activeId); - this.activeSource.start(store.getState().time.plotTimeMs); + this.sources = new Map(); + this.syncFromState(); } syncFromState() { const state = this.store.getState(); - const nextSource = this.sources.get(state.source.activeId); + const sourceEntries = Object.entries(state.sources); + const activeKeys = new Set(sourceEntries.map(([sourceKey]) => sourceKey)); - if (nextSource !== this.activeSource) { - this.activeSource?.stop(); - this.activeSource = nextSource; - this.activeSource?.start(state.time.plotTimeMs); + for (const [sourceKey, config] of sourceEntries) { + const existingSource = this.sources.get(sourceKey); + + if (!existingSource) { + const nextSource = this.createSource(sourceKey, config); + this.sources.set(sourceKey, nextSource); + nextSource.start(state.time.plotTimeMs); + continue; + } + + if (existingSource.sourceType !== config.type) { + existingSource.stop(); + const replacementSource = this.createSource(sourceKey, config); + this.sources.set(sourceKey, replacementSource); + replacementSource.start(state.time.plotTimeMs); + continue; + } + + existingSource.updateConfig(config); } - this.activeSource?.updateConfig(state.source); + for (const [sourceKey, source] of this.sources.entries()) { + if (!activeKeys.has(sourceKey)) { + source.stop(); + this.sources.delete(sourceKey); + } + } } - update(currentPlotTimeMs) { - if (!this.activeSource) { - return; + createSource(sourceKey, config) { + switch (config.type) { + case 'csv-replay': + return new CsvReplaySource(config); + case 'websocket': + return new WebSocketSource(config, { + onStatusChange: (statusPatch) => { + this.store.setState((state) => ({ + ...state, + sources: { + ...state.sources, + [sourceKey]: { + ...state.sources[sourceKey], + ...statusPatch, + }, + }, + })); + }, + }); + case 'synthetic-wave': + default: + return new SyntheticWaveSource(config); } + } - const points = this.activeSource.update(currentPlotTimeMs); - for (const point of points) { - this.bus.emit('data:point', point); + update(currentPlotTimeMs) { + for (const [sourceKey, source] of this.sources.entries()) { + const points = source.update(currentPlotTimeMs); + for (const point of points) { + this.bus.emit('data:point', { + ...point, + sourceId: sourceKey, + }); + } } } reset() { - this.activeSource?.reset(this.store.getState().time.plotTimeMs); + const startTimeMs = this.store.getState().time.plotTimeMs; + for (const source of this.sources.values()) { + source.reset(startTimeMs); + } } } diff --git a/web-timeplot/src/data/synthetic-wave-source.js b/web-timeplot/src/data/synthetic-wave-source.js index 3cf7fb1..df53319 100644 --- a/web-timeplot/src/data/synthetic-wave-source.js +++ b/web-timeplot/src/data/synthetic-wave-source.js @@ -18,6 +18,7 @@ export class SyntheticWaveSource extends BaseSource { noise: 0.08, ...config, }); + this.sourceType = 'synthetic-wave'; this.lastEmittedPlotTimeMs = 0; } diff --git a/web-timeplot/src/data/websocket-source.js b/web-timeplot/src/data/websocket-source.js new file mode 100644 index 0000000..5458fb9 --- /dev/null +++ b/web-timeplot/src/data/websocket-source.js @@ -0,0 +1,224 @@ +import { BaseSource } from './base-source.js'; + +function clamp(value, min, max) { + return Math.min(max, Math.max(min, value)); +} + +function isFiniteNumber(value) { + return typeof value === 'number' && Number.isFinite(value); +} + +function parsePayload(payload) { + if (Array.isArray(payload)) { + return payload.flatMap((item) => parsePayload(item)); + } + + if (isFiniteNumber(payload)) { + return [{ value: payload, timestampMs: null }]; + } + + if (typeof payload === 'string') { + const trimmed = payload.trim(); + if (!trimmed) { + return []; + } + + const numeric = Number(trimmed); + if (Number.isFinite(numeric)) { + return [{ value: numeric, timestampMs: null }]; + } + + try { + return parsePayload(JSON.parse(trimmed)); + } catch { + return []; + } + } + + if (payload && typeof payload === 'object') { + const candidateValue = [payload.value, payload.y, payload.signal, payload.data] + .find((value) => Number.isFinite(Number(value))); + + if (candidateValue === undefined) { + return []; + } + + const candidateTimestamp = [payload.timeMs, payload.timestampMs, payload.timestamp, payload.t] + .find((value) => Number.isFinite(Number(value))); + + return [{ + value: Number(candidateValue), + timestampMs: candidateTimestamp === undefined ? null : Number(candidateTimestamp), + }]; + } + + return []; +} + +export class WebSocketSource extends BaseSource { + constructor(config = {}, { onStatusChange } = {}) { + super({ + wsUrl: 'ws://localhost:8080', + wsReconnectMs: 2000, + ...config, + }); + this.sourceType = 'websocket'; + this.onStatusChange = onStatusChange; + this.socket = null; + this.queue = []; + this.lastPlotTimeMs = 0; + this.reconnectTimer = null; + this.shouldReconnect = false; + this.firstSourceTimestampMs = null; + this.basePlotTimeMs = 0; + } + + start(startTimeMs = 0) { + super.start(); + this.lastPlotTimeMs = startTimeMs; + this.basePlotTimeMs = startTimeMs; + this.shouldReconnect = true; + this.connect(); + } + + stop() { + super.stop(); + this.shouldReconnect = false; + this.clearReconnectTimer(); + if (this.socket) { + this.socket.close(); + this.socket = null; + } + this.setStatus('disconnected', 'socket closed'); + } + + reset(startTimeMs = 0) { + this.queue = []; + this.lastPlotTimeMs = startTimeMs; + this.basePlotTimeMs = startTimeMs; + this.firstSourceTimestampMs = null; + } + + updateConfig(nextConfig) { + const previousUrl = this.config.wsUrl; + const previousReconnectMs = this.config.wsReconnectMs; + super.updateConfig(nextConfig); + + if ((previousUrl !== this.config.wsUrl || previousReconnectMs !== this.config.wsReconnectMs) && this.running) { + this.reconnect(); + } + } + + update(currentPlotTimeMs) { + this.lastPlotTimeMs = currentPlotTimeMs; + + if (this.queue.length === 0) { + return []; + } + + const points = []; + while (this.queue.length > 0) { + const nextPoint = this.queue.shift(); + let timeMs = currentPlotTimeMs; + + if (isFiniteNumber(nextPoint.timestampMs)) { + if (this.firstSourceTimestampMs === null) { + this.firstSourceTimestampMs = nextPoint.timestampMs; + this.basePlotTimeMs = currentPlotTimeMs; + } + timeMs = this.basePlotTimeMs + (nextPoint.timestampMs - this.firstSourceTimestampMs); + } + + points.push({ + timeMs, + value: nextPoint.value, + sourceId: this.config.id ?? 'websocket', + }); + } + + return points; + } + + reconnect() { + if (!this.running) { + return; + } + + this.clearReconnectTimer(); + if (this.socket) { + this.socket.close(); + this.socket = null; + } + this.connect(); + } + + connect() { + const url = this.config.wsUrl?.trim(); + if (!url) { + this.setStatus('idle', 'enter a websocket url'); + return; + } + + this.clearReconnectTimer(); + this.setStatus('connecting', url); + + try { + this.socket = new WebSocket(url); + } catch (error) { + this.setStatus('error', error instanceof Error ? error.message : String(error)); + this.scheduleReconnect(); + return; + } + + this.socket.addEventListener('open', () => { + this.setStatus('connected', url); + }); + + this.socket.addEventListener('message', (event) => { + const parsedPoints = parsePayload(event.data); + if (parsedPoints.length === 0) { + return; + } + this.queue.push(...parsedPoints); + }); + + this.socket.addEventListener('error', () => { + this.setStatus('error', 'socket error'); + }); + + this.socket.addEventListener('close', () => { + this.socket = null; + if (!this.running) { + return; + } + this.setStatus('disconnected', 'retrying'); + this.scheduleReconnect(); + }); + } + + scheduleReconnect() { + if (!this.shouldReconnect || !this.running) { + return; + } + + const reconnectMs = clamp(Number(this.config.wsReconnectMs) || 2000, 250, 30000); + this.clearReconnectTimer(); + this.reconnectTimer = window.setTimeout(() => { + this.connect(); + }, reconnectMs); + } + + clearReconnectTimer() { + if (this.reconnectTimer !== null) { + window.clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + } + + setStatus(status, detail = '') { + this.onStatusChange?.({ + wsStatus: status, + wsStatusDetail: detail, + }); + } +} -- cgit v1.2.3