summaryrefslogtreecommitdiff
path: root/src/data
diff options
context:
space:
mode:
Diffstat (limited to 'src/data')
-rw-r--r--src/data/base-source.js21
-rw-r--r--src/data/csv-replay-source.js60
-rw-r--r--src/data/parse-replay-csv.js108
-rw-r--r--src/data/source-registry.js90
-rw-r--r--src/data/synthetic-wave-source.js87
-rw-r--r--src/data/websocket-source.js224
6 files changed, 590 insertions, 0 deletions
diff --git a/src/data/base-source.js b/src/data/base-source.js
new file mode 100644
index 0000000..55dbdc3
--- /dev/null
+++ b/src/data/base-source.js
@@ -0,0 +1,21 @@
+export class BaseSource {
+ constructor(config = {}) {
+ this.config = { ...config };
+ this.running = false;
+ }
+
+ start() {
+ this.running = true;
+ }
+
+ stop() {
+ this.running = false;
+ }
+
+ updateConfig(nextConfig) {
+ this.config = {
+ ...this.config,
+ ...nextConfig,
+ };
+ }
+}
diff --git a/src/data/csv-replay-source.js b/src/data/csv-replay-source.js
new file mode 100644
index 0000000..c4e6a66
--- /dev/null
+++ b/src/data/csv-replay-source.js
@@ -0,0 +1,60 @@
+import { BaseSource } from './base-source.js';
+
+function clamp(value, min, max) {
+ return Math.min(max, Math.max(min, value));
+}
+
+export class CsvReplaySource extends BaseSource {
+ constructor(config = {}) {
+ super({
+ replayRate: 1,
+ dataset: [],
+ ...config,
+ });
+ this.sourceType = 'csv-replay';
+ this.nextPointIndex = 0;
+ }
+
+ start(startTimeMs = 0) {
+ super.start();
+ this.reset(startTimeMs);
+ }
+
+ reset() {
+ this.nextPointIndex = 0;
+ }
+
+ updateConfig(nextConfig) {
+ const datasetChanged = nextConfig.dataset !== this.config.dataset;
+ super.updateConfig(nextConfig);
+ if (datasetChanged) {
+ this.reset();
+ }
+ }
+
+ update(currentPlotTimeMs) {
+ if (!this.running || !Array.isArray(this.config.dataset) || this.config.dataset.length === 0) {
+ return [];
+ }
+
+ const replayRate = clamp(this.config.replayRate ?? 1, 0.1, 8);
+ const targetDatasetTimeMs = currentPlotTimeMs * replayRate;
+ const points = [];
+
+ while (this.nextPointIndex < this.config.dataset.length) {
+ const datasetPoint = this.config.dataset[this.nextPointIndex];
+ if (datasetPoint.timeMs > targetDatasetTimeMs) {
+ break;
+ }
+
+ points.push({
+ timeMs: datasetPoint.timeMs / replayRate,
+ value: datasetPoint.value,
+ sourceId: this.config.id ?? 'csv-replay',
+ });
+ this.nextPointIndex += 1;
+ }
+
+ return points;
+ }
+}
diff --git a/src/data/parse-replay-csv.js b/src/data/parse-replay-csv.js
new file mode 100644
index 0000000..b6ce97a
--- /dev/null
+++ b/src/data/parse-replay-csv.js
@@ -0,0 +1,108 @@
+function splitRow(line) {
+ return line.split(/[;,\t]/).map((value) => value.trim());
+}
+
+function isNumeric(value) {
+ return value !== '' && Number.isFinite(Number(value));
+}
+
+function detectHeader(rows) {
+ if (rows.length === 0) {
+ return { hasHeader: false, headers: [] };
+ }
+
+ const [firstRow] = rows;
+ const hasHeader = firstRow.some((value) => !isNumeric(value));
+ return {
+ hasHeader,
+ headers: hasHeader ? firstRow.map((value) => value.toLowerCase()) : [],
+ };
+}
+
+function detectTimeScale(headers) {
+ const timeHeader = headers.find((header) => header.includes('time') || header.includes('timestamp'));
+ if (!timeHeader) {
+ return 1;
+ }
+
+ if (timeHeader.includes('sec') && !timeHeader.includes('msec') && !timeHeader.includes('ms')) {
+ return 1000;
+ }
+
+ return 1;
+}
+
+function detectColumnIndexes(headers, columnCount) {
+ if (headers.length === 0) {
+ return {
+ timeIndex: columnCount > 1 ? 0 : -1,
+ valueIndex: columnCount > 1 ? 1 : 0,
+ };
+ }
+
+ const timeIndex = headers.findIndex((header) => header.includes('time') || header.includes('timestamp'));
+ const valueIndex = headers.findIndex((header) => header.includes('value') || header.includes('signal') || header.includes('y'));
+
+ return {
+ timeIndex,
+ valueIndex: valueIndex >= 0 ? valueIndex : (headers.length > 1 ? 1 : 0),
+ };
+}
+
+export function parseReplayCsv(text, { sampleRateHz = 60 } = {}) {
+ const rows = text
+ .split(/\r?\n/)
+ .map((line) => line.trim())
+ .filter((line) => line && !line.startsWith('#'))
+ .map(splitRow)
+ .filter((row) => row.some((value) => value !== ''));
+
+ if (rows.length === 0) {
+ throw new Error('CSV file is empty');
+ }
+
+ const { hasHeader, headers } = detectHeader(rows);
+ const dataRows = hasHeader ? rows.slice(1) : rows;
+ const columnCount = rows[0].length;
+ const { timeIndex, valueIndex } = detectColumnIndexes(headers, columnCount);
+ const timeScale = detectTimeScale(headers);
+ const intervalMs = 1000 / Math.max(1, sampleRateHz);
+
+ const points = dataRows
+ .map((row, index) => {
+ const rawValue = row[valueIndex];
+ if (!isNumeric(rawValue)) {
+ return null;
+ }
+
+ const parsedValue = Number(rawValue);
+ const parsedTime = timeIndex >= 0 && isNumeric(row[timeIndex])
+ ? Number(row[timeIndex]) * timeScale
+ : index * intervalMs;
+
+ return {
+ timeMs: parsedTime,
+ value: parsedValue,
+ };
+ })
+ .filter(Boolean)
+ .sort((left, right) => left.timeMs - right.timeMs);
+
+ if (points.length === 0) {
+ throw new Error('CSV file did not contain any numeric data points');
+ }
+
+ const firstTime = points[0].timeMs;
+ const normalizedPoints = points.map((point) => ({
+ timeMs: point.timeMs - firstTime,
+ value: point.value,
+ }));
+
+ return {
+ points: normalizedPoints,
+ metadata: {
+ pointCount: normalizedPoints.length,
+ durationMs: normalizedPoints.at(-1)?.timeMs ?? 0,
+ },
+ };
+}
diff --git a/src/data/source-registry.js b/src/data/source-registry.js
new file mode 100644
index 0000000..917d06b
--- /dev/null
+++ b/src/data/source-registry.js
@@ -0,0 +1,90 @@
+import { CsvReplaySource } from './csv-replay-source.js';
+import { SyntheticWaveSource } from './synthetic-wave-source.js';
+import { WebSocketSource } from './websocket-source.js';
+
+export class SourceRegistry {
+ constructor(store, bus) {
+ this.store = store;
+ this.bus = bus;
+ this.sources = new Map();
+ this.syncFromState();
+ }
+
+ syncFromState() {
+ const state = this.store.getState();
+ const sourceEntries = Object.entries(state.sources);
+ const activeKeys = new Set(sourceEntries.map(([sourceKey]) => sourceKey));
+
+ for (const [sourceKey, config] of sourceEntries) {
+ const existingSource = this.sources.get(sourceKey);
+
+ if (!existingSource) {
+ const nextSource = this.createSource(sourceKey, config);
+ this.sources.set(sourceKey, nextSource);
+ nextSource.start(state.time.plotTimeMs);
+ continue;
+ }
+
+ if (existingSource.sourceType !== config.type) {
+ existingSource.stop();
+ const replacementSource = this.createSource(sourceKey, config);
+ this.sources.set(sourceKey, replacementSource);
+ replacementSource.start(state.time.plotTimeMs);
+ continue;
+ }
+
+ existingSource.updateConfig(config);
+ }
+
+ for (const [sourceKey, source] of this.sources.entries()) {
+ if (!activeKeys.has(sourceKey)) {
+ source.stop();
+ this.sources.delete(sourceKey);
+ }
+ }
+ }
+
+ createSource(sourceKey, config) {
+ switch (config.type) {
+ case 'csv-replay':
+ return new CsvReplaySource(config);
+ case 'websocket':
+ return new WebSocketSource(config, {
+ onStatusChange: (statusPatch) => {
+ this.store.setState((state) => ({
+ ...state,
+ sources: {
+ ...state.sources,
+ [sourceKey]: {
+ ...state.sources[sourceKey],
+ ...statusPatch,
+ },
+ },
+ }));
+ },
+ });
+ case 'synthetic-wave':
+ default:
+ return new SyntheticWaveSource(config);
+ }
+ }
+
+ update(currentPlotTimeMs) {
+ for (const [sourceKey, source] of this.sources.entries()) {
+ const points = source.update(currentPlotTimeMs);
+ for (const point of points) {
+ this.bus.emit('data:point', {
+ ...point,
+ sourceId: sourceKey,
+ });
+ }
+ }
+ }
+
+ reset() {
+ const startTimeMs = this.store.getState().time.plotTimeMs;
+ for (const source of this.sources.values()) {
+ source.reset(startTimeMs);
+ }
+ }
+}
diff --git a/src/data/synthetic-wave-source.js b/src/data/synthetic-wave-source.js
new file mode 100644
index 0000000..df53319
--- /dev/null
+++ b/src/data/synthetic-wave-source.js
@@ -0,0 +1,87 @@
+import { BaseSource } from './base-source.js';
+
+function clamp(value, min, max) {
+ return Math.min(max, Math.max(min, value));
+}
+
+function createDeterministicNoise(seed) {
+ const x = Math.sin(seed * 12.9898) * 43758.5453;
+ return x - Math.floor(x);
+}
+
+export class SyntheticWaveSource extends BaseSource {
+ constructor(config = {}) {
+ super({
+ sampleRateHz: 60,
+ preset: 'telemetry',
+ amplitude: 1,
+ noise: 0.08,
+ ...config,
+ });
+ this.sourceType = 'synthetic-wave';
+ this.lastEmittedPlotTimeMs = 0;
+ }
+
+ start(startTimeMs = 0) {
+ super.start();
+ this.lastEmittedPlotTimeMs = startTimeMs;
+ }
+
+ stop() {
+ super.stop();
+ }
+
+ reset(startTimeMs = 0) {
+ this.lastEmittedPlotTimeMs = startTimeMs;
+ }
+
+ sampleValue(timeMs) {
+ const seconds = timeMs / 1000;
+ const amplitude = this.config.amplitude;
+ const noise = this.config.noise;
+ const grain = (createDeterministicNoise(timeMs * 0.017) - 0.5) * 2 * noise;
+
+ switch (this.config.preset) {
+ case 'chirp': {
+ const sweep = Math.sin(seconds * seconds * 1.4);
+ return amplitude * (0.7 * sweep + 0.3 * Math.sin(seconds * 7.5)) + grain;
+ }
+ case 'burst': {
+ const burstPhase = (seconds % 6) - 1.5;
+ const burst = Math.sin(seconds * 9.5) * Math.exp(-(burstPhase ** 2) * 0.8);
+ return amplitude * (0.45 * Math.sin(seconds * 2.1) + burst) + grain;
+ }
+ case 'telemetry':
+ default: {
+ const carrier = Math.sin(seconds * 2.2);
+ const secondary = 0.35 * Math.cos(seconds * 6.4 + Math.sin(seconds * 0.8));
+ const envelope = 0.15 * Math.sin(seconds * 0.33);
+ return amplitude * (carrier + secondary + envelope) + grain;
+ }
+ }
+ }
+
+ update(currentPlotTimeMs) {
+ if (!this.running) {
+ return [];
+ }
+
+ const intervalMs = 1000 / clamp(this.config.sampleRateHz, 1, 240);
+ if (currentPlotTimeMs < this.lastEmittedPlotTimeMs) {
+ this.lastEmittedPlotTimeMs = currentPlotTimeMs;
+ return [];
+ }
+
+ const points = [];
+ while (this.lastEmittedPlotTimeMs + intervalMs <= currentPlotTimeMs) {
+ this.lastEmittedPlotTimeMs += intervalMs;
+ points.push({
+ timeMs: this.lastEmittedPlotTimeMs,
+ value: this.sampleValue(this.lastEmittedPlotTimeMs),
+ sourceId: 'synthetic-wave',
+ });
+ }
+
+ return points;
+ }
+}
diff --git a/src/data/websocket-source.js b/src/data/websocket-source.js
new file mode 100644
index 0000000..5458fb9
--- /dev/null
+++ b/src/data/websocket-source.js
@@ -0,0 +1,224 @@
+import { BaseSource } from './base-source.js';
+
+function clamp(value, min, max) {
+ return Math.min(max, Math.max(min, value));
+}
+
+function isFiniteNumber(value) {
+ return typeof value === 'number' && Number.isFinite(value);
+}
+
+function parsePayload(payload) {
+ if (Array.isArray(payload)) {
+ return payload.flatMap((item) => parsePayload(item));
+ }
+
+ if (isFiniteNumber(payload)) {
+ return [{ value: payload, timestampMs: null }];
+ }
+
+ if (typeof payload === 'string') {
+ const trimmed = payload.trim();
+ if (!trimmed) {
+ return [];
+ }
+
+ const numeric = Number(trimmed);
+ if (Number.isFinite(numeric)) {
+ return [{ value: numeric, timestampMs: null }];
+ }
+
+ try {
+ return parsePayload(JSON.parse(trimmed));
+ } catch {
+ return [];
+ }
+ }
+
+ if (payload && typeof payload === 'object') {
+ const candidateValue = [payload.value, payload.y, payload.signal, payload.data]
+ .find((value) => Number.isFinite(Number(value)));
+
+ if (candidateValue === undefined) {
+ return [];
+ }
+
+ const candidateTimestamp = [payload.timeMs, payload.timestampMs, payload.timestamp, payload.t]
+ .find((value) => Number.isFinite(Number(value)));
+
+ return [{
+ value: Number(candidateValue),
+ timestampMs: candidateTimestamp === undefined ? null : Number(candidateTimestamp),
+ }];
+ }
+
+ return [];
+}
+
+export class WebSocketSource extends BaseSource {
+ constructor(config = {}, { onStatusChange } = {}) {
+ super({
+ wsUrl: 'ws://localhost:8080',
+ wsReconnectMs: 2000,
+ ...config,
+ });
+ this.sourceType = 'websocket';
+ this.onStatusChange = onStatusChange;
+ this.socket = null;
+ this.queue = [];
+ this.lastPlotTimeMs = 0;
+ this.reconnectTimer = null;
+ this.shouldReconnect = false;
+ this.firstSourceTimestampMs = null;
+ this.basePlotTimeMs = 0;
+ }
+
+ start(startTimeMs = 0) {
+ super.start();
+ this.lastPlotTimeMs = startTimeMs;
+ this.basePlotTimeMs = startTimeMs;
+ this.shouldReconnect = true;
+ this.connect();
+ }
+
+ stop() {
+ super.stop();
+ this.shouldReconnect = false;
+ this.clearReconnectTimer();
+ if (this.socket) {
+ this.socket.close();
+ this.socket = null;
+ }
+ this.setStatus('disconnected', 'socket closed');
+ }
+
+ reset(startTimeMs = 0) {
+ this.queue = [];
+ this.lastPlotTimeMs = startTimeMs;
+ this.basePlotTimeMs = startTimeMs;
+ this.firstSourceTimestampMs = null;
+ }
+
+ updateConfig(nextConfig) {
+ const previousUrl = this.config.wsUrl;
+ const previousReconnectMs = this.config.wsReconnectMs;
+ super.updateConfig(nextConfig);
+
+ if ((previousUrl !== this.config.wsUrl || previousReconnectMs !== this.config.wsReconnectMs) && this.running) {
+ this.reconnect();
+ }
+ }
+
+ update(currentPlotTimeMs) {
+ this.lastPlotTimeMs = currentPlotTimeMs;
+
+ if (this.queue.length === 0) {
+ return [];
+ }
+
+ const points = [];
+ while (this.queue.length > 0) {
+ const nextPoint = this.queue.shift();
+ let timeMs = currentPlotTimeMs;
+
+ if (isFiniteNumber(nextPoint.timestampMs)) {
+ if (this.firstSourceTimestampMs === null) {
+ this.firstSourceTimestampMs = nextPoint.timestampMs;
+ this.basePlotTimeMs = currentPlotTimeMs;
+ }
+ timeMs = this.basePlotTimeMs + (nextPoint.timestampMs - this.firstSourceTimestampMs);
+ }
+
+ points.push({
+ timeMs,
+ value: nextPoint.value,
+ sourceId: this.config.id ?? 'websocket',
+ });
+ }
+
+ return points;
+ }
+
+ reconnect() {
+ if (!this.running) {
+ return;
+ }
+
+ this.clearReconnectTimer();
+ if (this.socket) {
+ this.socket.close();
+ this.socket = null;
+ }
+ this.connect();
+ }
+
+ connect() {
+ const url = this.config.wsUrl?.trim();
+ if (!url) {
+ this.setStatus('idle', 'enter a websocket url');
+ return;
+ }
+
+ this.clearReconnectTimer();
+ this.setStatus('connecting', url);
+
+ try {
+ this.socket = new WebSocket(url);
+ } catch (error) {
+ this.setStatus('error', error instanceof Error ? error.message : String(error));
+ this.scheduleReconnect();
+ return;
+ }
+
+ this.socket.addEventListener('open', () => {
+ this.setStatus('connected', url);
+ });
+
+ this.socket.addEventListener('message', (event) => {
+ const parsedPoints = parsePayload(event.data);
+ if (parsedPoints.length === 0) {
+ return;
+ }
+ this.queue.push(...parsedPoints);
+ });
+
+ this.socket.addEventListener('error', () => {
+ this.setStatus('error', 'socket error');
+ });
+
+ this.socket.addEventListener('close', () => {
+ this.socket = null;
+ if (!this.running) {
+ return;
+ }
+ this.setStatus('disconnected', 'retrying');
+ this.scheduleReconnect();
+ });
+ }
+
+ scheduleReconnect() {
+ if (!this.shouldReconnect || !this.running) {
+ return;
+ }
+
+ const reconnectMs = clamp(Number(this.config.wsReconnectMs) || 2000, 250, 30000);
+ this.clearReconnectTimer();
+ this.reconnectTimer = window.setTimeout(() => {
+ this.connect();
+ }, reconnectMs);
+ }
+
+ clearReconnectTimer() {
+ if (this.reconnectTimer !== null) {
+ window.clearTimeout(this.reconnectTimer);
+ this.reconnectTimer = null;
+ }
+ }
+
+ setStatus(status, detail = '') {
+ this.onStatusChange?.({
+ wsStatus: status,
+ wsStatusDetail: detail,
+ });
+ }
+}