summaryrefslogtreecommitdiff
path: root/web-timeplot/src/data-sources.js
diff options
context:
space:
mode:
Diffstat (limited to 'web-timeplot/src/data-sources.js')
-rw-r--r--web-timeplot/src/data-sources.js517
1 files changed, 517 insertions, 0 deletions
diff --git a/web-timeplot/src/data-sources.js b/web-timeplot/src/data-sources.js
new file mode 100644
index 0000000..749a151
--- /dev/null
+++ b/web-timeplot/src/data-sources.js
@@ -0,0 +1,517 @@
+/**
+ * Data Sources - Components that generate or provide data to plots
+ *
+ * This module implements the data provider side of the architecture.
+ * Data sources know how to generate or fetch data, but don't know
+ * anything about visualization.
+ *
+ * Architecture:
+ * - DataSource: Base class with event emitting
+ * - Specific sources: Implement different data generation strategies
+ * - Connection: Links sources to plots (see plot-connections.js)
+ */
+
+// Simple EventEmitter (same as in state.js, could be extracted to utils)
+class EventEmitter {
+ constructor() {
+ this.events = new Map();
+ }
+
+ on(event, callback) {
+ if (!this.events.has(event)) {
+ this.events.set(event, []);
+ }
+ this.events.get(event).push(callback);
+ return () => this.off(event, callback);
+ }
+
+ off(event, callback) {
+ if (!this.events.has(event)) return;
+ const callbacks = this.events.get(event);
+ const index = callbacks.indexOf(callback);
+ if (index > -1) {
+ callbacks.splice(index, 1);
+ }
+ }
+
+ emit(event, data) {
+ if (!this.events.has(event)) return;
+ this.events.get(event).forEach(callback => {
+ try {
+ callback(data);
+ } catch (e) {
+ console.error(`[DataSource] Error in event handler for '${event}':`, e);
+ }
+ });
+ }
+}
+
+/**
+ * Base class for all data sources
+ *
+ * Events emitted:
+ * - 'line': {points: Array, timestamp: number, metadata: Object}
+ * - 'point': {value: number, timestamp: number}
+ * - 'error': {error: Error}
+ */
+export class DataSource extends EventEmitter {
+ constructor(config = {}) {
+ super();
+ this.config = config;
+ this.isRunning = false;
+ this.time = 0;
+ }
+
+ /**
+ * Start generating/providing data
+ */
+ start() {
+ this.isRunning = true;
+ }
+
+ /**
+ * Stop generating/providing data
+ */
+ stop() {
+ this.isRunning = false;
+ }
+
+ /**
+ * Reset the data source to initial state
+ */
+ reset() {
+ this.time = 0;
+ }
+
+ /**
+ * Emit a complete line of data
+ */
+ emitLine(points, metadata = {}) {
+ this.emit('line', {
+ points,
+ timestamp: metadata.timestamp || Date.now(),
+ metadata,
+ });
+ }
+
+ /**
+ * Emit a single data point
+ */
+ emitPoint(value, timestamp = Date.now()) {
+ this.emit('point', {
+ value,
+ timestamp,
+ });
+ }
+
+ /**
+ * Emit an error
+ */
+ emitError(error) {
+ this.emit('error', { error });
+ }
+}
+
+/**
+ * Synthetic data source using test generators
+ * Uses the generators from test-data-generators.js
+ */
+export class SyntheticDataSource extends DataSource {
+ constructor(config = {}) {
+ super(config);
+ this.generator = config.generator; // Instance of DataGenerator
+ this.pointsPerLine = config.pointsPerLine || 100;
+ this.width = config.width || 800;
+ this.lineInterval = config.lineInterval || 100; // ms between lines
+ this.intervalHandle = null;
+ }
+
+ start() {
+ if (this.isRunning) return;
+ super.start();
+
+ // Generate a new line periodically
+ this.intervalHandle = setInterval(() => {
+ this.generateAndEmitLine();
+ }, this.lineInterval);
+
+ // Generate initial line immediately
+ this.generateAndEmitLine();
+ }
+
+ stop() {
+ super.stop();
+ if (this.intervalHandle) {
+ clearInterval(this.intervalHandle);
+ this.intervalHandle = null;
+ }
+ }
+
+ generateAndEmitLine() {
+ if (!this.generator) {
+ this.emitError(new Error('No generator configured'));
+ return;
+ }
+
+ const points = this.generator.generateLine(this.pointsPerLine, this.width);
+ this.emitLine(points, {
+ timestamp: Date.now(),
+ generatorType: this.generator.constructor.name,
+ });
+ }
+
+ setGenerator(generator) {
+ this.generator = generator;
+ }
+}
+
+/**
+ * Function-based data source
+ * Evaluates a user-provided function to generate data
+ */
+export class FunctionDataSource extends DataSource {
+ constructor(config = {}) {
+ super(config);
+ // Function should have signature: (x, t) => y
+ // x: normalized position 0-1
+ // t: time in seconds
+ // returns: y value
+ this.func = config.func || ((x, t) => Math.sin(x * 10 + t));
+ this.pointsPerLine = config.pointsPerLine || 100;
+ this.width = config.width || 800;
+ this.amplitude = config.amplitude || 30;
+ this.lineInterval = config.lineInterval || 100;
+ this.intervalHandle = null;
+ }
+
+ start() {
+ if (this.isRunning) return;
+ super.start();
+
+ this.intervalHandle = setInterval(() => {
+ this.generateAndEmitLine();
+ }, this.lineInterval);
+
+ this.generateAndEmitLine();
+ }
+
+ stop() {
+ super.stop();
+ if (this.intervalHandle) {
+ clearInterval(this.intervalHandle);
+ this.intervalHandle = null;
+ }
+ }
+
+ generateAndEmitLine() {
+ const points = [];
+ const t = this.time;
+
+ for (let i = 0; i < this.pointsPerLine; i++) {
+ const x = (i / this.pointsPerLine) * this.width;
+ const normalizedX = i / this.pointsPerLine;
+ const y = this.func(normalizedX, t) * this.amplitude;
+ points.push({ x, y });
+ }
+
+ this.emitLine(points, {
+ timestamp: Date.now(),
+ time: t,
+ });
+
+ this.time += this.lineInterval / 1000;
+ }
+
+ setFunction(func) {
+ this.func = func;
+ }
+}
+
+/**
+ * Streaming data source
+ * Emits individual data points that get buffered into lines
+ */
+export class StreamingDataSource extends DataSource {
+ constructor(config = {}) {
+ super(config);
+ this.generator = config.generator;
+ this.sampleRate = config.sampleRate || 60; // Samples per second
+ this.intervalHandle = null;
+ }
+
+ start() {
+ if (this.isRunning) return;
+ super.start();
+
+ const intervalMs = 1000 / this.sampleRate;
+ this.intervalHandle = setInterval(() => {
+ this.generateAndEmitPoint();
+ }, intervalMs);
+ }
+
+ stop() {
+ super.stop();
+ if (this.intervalHandle) {
+ clearInterval(this.intervalHandle);
+ this.intervalHandle = null;
+ }
+ }
+
+ generateAndEmitPoint() {
+ if (!this.generator) {
+ this.emitError(new Error('No generator configured'));
+ return;
+ }
+
+ const value = this.generator.sample();
+ this.generator.time += 1 / this.generator.sampleRate;
+ this.emitPoint(value, Date.now());
+ }
+
+ setGenerator(generator) {
+ this.generator = generator;
+ }
+}
+
+/**
+ * WebSocket data source (for real data)
+ * Receives data from a WebSocket connection
+ */
+export class WebSocketDataSource extends DataSource {
+ constructor(config = {}) {
+ super(config);
+ this.url = config.url;
+ this.socket = null;
+ this.reconnectInterval = config.reconnectInterval || 5000;
+ this.reconnectHandle = null;
+ }
+
+ start() {
+ if (this.isRunning) return;
+ super.start();
+ this.connect();
+ }
+
+ stop() {
+ super.stop();
+ if (this.socket) {
+ this.socket.close();
+ this.socket = null;
+ }
+ if (this.reconnectHandle) {
+ clearTimeout(this.reconnectHandle);
+ this.reconnectHandle = null;
+ }
+ }
+
+ connect() {
+ try {
+ this.socket = new WebSocket(this.url);
+
+ this.socket.onopen = () => {
+ console.log(`[WebSocketDataSource] Connected to ${this.url}`);
+ };
+
+ this.socket.onmessage = (event) => {
+ this.handleMessage(event.data);
+ };
+
+ this.socket.onerror = (error) => {
+ console.error('[WebSocketDataSource] Error:', error);
+ this.emitError(error);
+ };
+
+ this.socket.onclose = () => {
+ console.log('[WebSocketDataSource] Connection closed');
+ if (this.isRunning) {
+ // Auto-reconnect
+ this.reconnectHandle = setTimeout(() => {
+ this.connect();
+ }, this.reconnectInterval);
+ }
+ };
+ } catch (error) {
+ console.error('[WebSocketDataSource] Failed to connect:', error);
+ this.emitError(error);
+ }
+ }
+
+ handleMessage(data) {
+ try {
+ const parsed = JSON.parse(data);
+
+ // Expect format: {type: 'line', points: [...]} or {type: 'point', value: ...}
+ if (parsed.type === 'line' && parsed.points) {
+ this.emitLine(parsed.points, parsed.metadata || {});
+ } else if (parsed.type === 'point' && parsed.value !== undefined) {
+ this.emitPoint(parsed.value, parsed.timestamp);
+ } else {
+ console.warn('[WebSocketDataSource] Unknown message format:', parsed);
+ }
+ } catch (error) {
+ console.error('[WebSocketDataSource] Failed to parse message:', error);
+ this.emitError(error);
+ }
+ }
+
+ send(data) {
+ if (this.socket && this.socket.readyState === WebSocket.OPEN) {
+ this.socket.send(JSON.stringify(data));
+ }
+ }
+}
+
+/**
+ * CSV File data source
+ * Reads data from CSV files (for replay/analysis)
+ */
+export class CSVDataSource extends DataSource {
+ constructor(config = {}) {
+ super(config);
+ this.data = []; // Parsed CSV data
+ this.currentIndex = 0;
+ this.playbackRate = config.playbackRate || 1.0;
+ this.loop = config.loop || false;
+ this.intervalHandle = null;
+ }
+
+ /**
+ * Load CSV data from a string
+ * Expected format: timestamp,value or x,y format
+ */
+ loadCSV(csvString) {
+ const lines = csvString.trim().split('\n');
+ const headers = lines[0].split(',').map(h => h.trim());
+
+ this.data = [];
+ for (let i = 1; i < lines.length; i++) {
+ const values = lines[i].split(',').map(v => parseFloat(v.trim()));
+ if (values.length >= 2 && !values.some(isNaN)) {
+ this.data.push({
+ timestamp: values[0],
+ value: values[1],
+ });
+ }
+ }
+
+ console.log(`[CSVDataSource] Loaded ${this.data.length} data points`);
+ }
+
+ start() {
+ if (this.isRunning || this.data.length === 0) return;
+ super.start();
+
+ // Play back at specified rate
+ this.intervalHandle = setInterval(() => {
+ this.emitNextPoint();
+ }, 16 / this.playbackRate); // ~60fps adjusted by playback rate
+ }
+
+ stop() {
+ super.stop();
+ if (this.intervalHandle) {
+ clearInterval(this.intervalHandle);
+ this.intervalHandle = null;
+ }
+ }
+
+ reset() {
+ super.reset();
+ this.currentIndex = 0;
+ }
+
+ emitNextPoint() {
+ if (this.currentIndex >= this.data.length) {
+ if (this.loop) {
+ this.currentIndex = 0;
+ } else {
+ this.stop();
+ return;
+ }
+ }
+
+ const point = this.data[this.currentIndex];
+ this.emitPoint(point.value, point.timestamp);
+ this.currentIndex++;
+ }
+}
+
+/**
+ * Multi-source combiner
+ * Combines data from multiple sources
+ */
+export class CompositeDataSource extends DataSource {
+ constructor(config = {}) {
+ super(config);
+ this.sources = config.sources || [];
+ this.combineMode = config.combineMode || 'average'; // 'average', 'sum', 'max', 'min'
+ this.pointBuffer = new Map(); // sourceId => latest point
+ }
+
+ start() {
+ if (this.isRunning) return;
+ super.start();
+
+ // Subscribe to all sources
+ this.sources.forEach((source, idx) => {
+ source.on('point', (data) => {
+ this.handleSourcePoint(idx, data);
+ });
+ source.on('line', (data) => {
+ this.handleSourceLine(idx, data);
+ });
+ source.start();
+ });
+ }
+
+ stop() {
+ super.stop();
+ this.sources.forEach(source => source.stop());
+ }
+
+ handleSourcePoint(sourceIdx, data) {
+ this.pointBuffer.set(sourceIdx, data.value);
+
+ // If we have data from all sources, combine and emit
+ if (this.pointBuffer.size === this.sources.length) {
+ const combined = this.combineValues(Array.from(this.pointBuffer.values()));
+ this.emitPoint(combined, data.timestamp);
+ }
+ }
+
+ handleSourceLine(sourceIdx, data) {
+ // For lines, just pass through for now
+ // Could implement line combination if needed
+ this.emitLine(data.points, data.metadata);
+ }
+
+ combineValues(values) {
+ switch (this.combineMode) {
+ case 'sum':
+ return values.reduce((a, b) => a + b, 0);
+ case 'average':
+ return values.reduce((a, b) => a + b, 0) / values.length;
+ case 'max':
+ return Math.max(...values);
+ case 'min':
+ return Math.min(...values);
+ default:
+ return values[0];
+ }
+ }
+
+ addSource(source) {
+ this.sources.push(source);
+ if (this.isRunning) {
+ source.start();
+ }
+ }
+
+ removeSource(source) {
+ const idx = this.sources.indexOf(source);
+ if (idx > -1) {
+ source.stop();
+ this.sources.splice(idx, 1);
+ }
+ }
+}