/** * Data Sources - Components that generate or provide data to plots * * This module implements the data provider side of the architecture. * Data sources know how to generate or fetch data, but don't know * anything about visualization. * * Architecture: * - DataSource: Base class with event emitting * - Specific sources: Implement different data generation strategies * - Connection: Links sources to plots (see plot-connections.js) */ // Simple EventEmitter (same as in state.js, could be extracted to utils) class EventEmitter { constructor() { this.events = new Map(); } on(event, callback) { if (!this.events.has(event)) { this.events.set(event, []); } this.events.get(event).push(callback); return () => this.off(event, callback); } off(event, callback) { if (!this.events.has(event)) return; const callbacks = this.events.get(event); const index = callbacks.indexOf(callback); if (index > -1) { callbacks.splice(index, 1); } } emit(event, data) { if (!this.events.has(event)) return; this.events.get(event).forEach(callback => { try { callback(data); } catch (e) { console.error(`[DataSource] Error in event handler for '${event}':`, e); } }); } } /** * Base class for all data sources * * Events emitted: * - 'line': {points: Array, timestamp: number, metadata: Object} * - 'point': {value: number, timestamp: number} * - 'error': {error: Error} */ export class DataSource extends EventEmitter { constructor(config = {}) { super(); this.config = config; this.isRunning = false; this.time = 0; } /** * Start generating/providing data */ start() { this.isRunning = true; } /** * Stop generating/providing data */ stop() { this.isRunning = false; } /** * Reset the data source to initial state */ reset() { this.time = 0; } /** * Emit a complete line of data */ emitLine(points, metadata = {}) { this.emit('line', { points, timestamp: metadata.timestamp || Date.now(), metadata, }); } /** * Emit a single data point */ emitPoint(value, timestamp = Date.now()) { this.emit('point', { value, timestamp, }); } /** * Emit an error */ emitError(error) { this.emit('error', { error }); } } /** * Synthetic data source using test generators * Uses the generators from test-data-generators.js */ export class SyntheticDataSource extends DataSource { constructor(config = {}) { super(config); this.generator = config.generator; // Instance of DataGenerator this.pointsPerLine = config.pointsPerLine || 100; this.width = config.width || 800; this.lineInterval = config.lineInterval || 100; // ms between lines this.intervalHandle = null; } start() { if (this.isRunning) return; super.start(); // Generate a new line periodically this.intervalHandle = setInterval(() => { this.generateAndEmitLine(); }, this.lineInterval); // Generate initial line immediately this.generateAndEmitLine(); } stop() { super.stop(); if (this.intervalHandle) { clearInterval(this.intervalHandle); this.intervalHandle = null; } } generateAndEmitLine() { if (!this.generator) { this.emitError(new Error('No generator configured')); return; } const points = this.generator.generateLine(this.pointsPerLine, this.width); this.emitLine(points, { timestamp: Date.now(), generatorType: this.generator.constructor.name, }); } setGenerator(generator) { this.generator = generator; } } /** * Function-based data source * Evaluates a user-provided function to generate data */ export class FunctionDataSource extends DataSource { constructor(config = {}) { super(config); // Function should have signature: (x, t) => y // x: normalized position 0-1 // t: time in seconds // returns: y value this.func = config.func || ((x, t) => Math.sin(x * 10 + t)); this.pointsPerLine = config.pointsPerLine || 100; this.width = config.width || 800; this.amplitude = config.amplitude || 30; this.lineInterval = config.lineInterval || 100; this.intervalHandle = null; } start() { if (this.isRunning) return; super.start(); this.intervalHandle = setInterval(() => { this.generateAndEmitLine(); }, this.lineInterval); this.generateAndEmitLine(); } stop() { super.stop(); if (this.intervalHandle) { clearInterval(this.intervalHandle); this.intervalHandle = null; } } generateAndEmitLine() { const points = []; const t = this.time; for (let i = 0; i < this.pointsPerLine; i++) { const x = (i / this.pointsPerLine) * this.width; const normalizedX = i / this.pointsPerLine; const y = this.func(normalizedX, t) * this.amplitude; points.push({ x, y }); } this.emitLine(points, { timestamp: Date.now(), time: t, }); this.time += this.lineInterval / 1000; } setFunction(func) { this.func = func; } } /** * Streaming data source * Emits individual data points that get buffered into lines */ export class StreamingDataSource extends DataSource { constructor(config = {}) { super(config); this.generator = config.generator; this.sampleRate = config.sampleRate || 60; // Samples per second this.intervalHandle = null; } start() { if (this.isRunning) return; super.start(); const intervalMs = 1000 / this.sampleRate; this.intervalHandle = setInterval(() => { this.generateAndEmitPoint(); }, intervalMs); } stop() { super.stop(); if (this.intervalHandle) { clearInterval(this.intervalHandle); this.intervalHandle = null; } } generateAndEmitPoint() { if (!this.generator) { this.emitError(new Error('No generator configured')); return; } const value = this.generator.sample(); this.generator.time += 1 / this.generator.sampleRate; this.emitPoint(value, Date.now()); } setGenerator(generator) { this.generator = generator; } } /** * WebSocket data source (for real data) * Receives data from a WebSocket connection */ export class WebSocketDataSource extends DataSource { constructor(config = {}) { super(config); this.url = config.url; this.socket = null; this.reconnectInterval = config.reconnectInterval || 5000; this.reconnectHandle = null; } start() { if (this.isRunning) return; super.start(); this.connect(); } stop() { super.stop(); if (this.socket) { this.socket.close(); this.socket = null; } if (this.reconnectHandle) { clearTimeout(this.reconnectHandle); this.reconnectHandle = null; } } connect() { try { this.socket = new WebSocket(this.url); this.socket.onopen = () => { console.log(`[WebSocketDataSource] Connected to ${this.url}`); }; this.socket.onmessage = (event) => { this.handleMessage(event.data); }; this.socket.onerror = (error) => { console.error('[WebSocketDataSource] Error:', error); this.emitError(error); }; this.socket.onclose = () => { console.log('[WebSocketDataSource] Connection closed'); if (this.isRunning) { // Auto-reconnect this.reconnectHandle = setTimeout(() => { this.connect(); }, this.reconnectInterval); } }; } catch (error) { console.error('[WebSocketDataSource] Failed to connect:', error); this.emitError(error); } } handleMessage(data) { try { const parsed = JSON.parse(data); // Expect format: {type: 'line', points: [...]} or {type: 'point', value: ...} if (parsed.type === 'line' && parsed.points) { this.emitLine(parsed.points, parsed.metadata || {}); } else if (parsed.type === 'point' && parsed.value !== undefined) { this.emitPoint(parsed.value, parsed.timestamp); } else { console.warn('[WebSocketDataSource] Unknown message format:', parsed); } } catch (error) { console.error('[WebSocketDataSource] Failed to parse message:', error); this.emitError(error); } } send(data) { if (this.socket && this.socket.readyState === WebSocket.OPEN) { this.socket.send(JSON.stringify(data)); } } } /** * CSV File data source * Reads data from CSV files (for replay/analysis) */ export class CSVDataSource extends DataSource { constructor(config = {}) { super(config); this.data = []; // Parsed CSV data this.currentIndex = 0; this.playbackRate = config.playbackRate || 1.0; this.loop = config.loop || false; this.intervalHandle = null; } /** * Load CSV data from a string * Expected format: timestamp,value or x,y format */ loadCSV(csvString) { const lines = csvString.trim().split('\n'); const headers = lines[0].split(',').map(h => h.trim()); this.data = []; for (let i = 1; i < lines.length; i++) { const values = lines[i].split(',').map(v => parseFloat(v.trim())); if (values.length >= 2 && !values.some(isNaN)) { this.data.push({ timestamp: values[0], value: values[1], }); } } console.log(`[CSVDataSource] Loaded ${this.data.length} data points`); } start() { if (this.isRunning || this.data.length === 0) return; super.start(); // Play back at specified rate this.intervalHandle = setInterval(() => { this.emitNextPoint(); }, 16 / this.playbackRate); // ~60fps adjusted by playback rate } stop() { super.stop(); if (this.intervalHandle) { clearInterval(this.intervalHandle); this.intervalHandle = null; } } reset() { super.reset(); this.currentIndex = 0; } emitNextPoint() { if (this.currentIndex >= this.data.length) { if (this.loop) { this.currentIndex = 0; } else { this.stop(); return; } } const point = this.data[this.currentIndex]; this.emitPoint(point.value, point.timestamp); this.currentIndex++; } } /** * Multi-source combiner * Combines data from multiple sources */ export class CompositeDataSource extends DataSource { constructor(config = {}) { super(config); this.sources = config.sources || []; this.combineMode = config.combineMode || 'average'; // 'average', 'sum', 'max', 'min' this.pointBuffer = new Map(); // sourceId => latest point } start() { if (this.isRunning) return; super.start(); // Subscribe to all sources this.sources.forEach((source, idx) => { source.on('point', (data) => { this.handleSourcePoint(idx, data); }); source.on('line', (data) => { this.handleSourceLine(idx, data); }); source.start(); }); } stop() { super.stop(); this.sources.forEach(source => source.stop()); } handleSourcePoint(sourceIdx, data) { this.pointBuffer.set(sourceIdx, data.value); // If we have data from all sources, combine and emit if (this.pointBuffer.size === this.sources.length) { const combined = this.combineValues(Array.from(this.pointBuffer.values())); this.emitPoint(combined, data.timestamp); } } handleSourceLine(sourceIdx, data) { // For lines, just pass through for now // Could implement line combination if needed this.emitLine(data.points, data.metadata); } combineValues(values) { switch (this.combineMode) { case 'sum': return values.reduce((a, b) => a + b, 0); case 'average': return values.reduce((a, b) => a + b, 0) / values.length; case 'max': return Math.max(...values); case 'min': return Math.min(...values); default: return values[0]; } } addSource(source) { this.sources.push(source); if (this.isRunning) { source.start(); } } removeSource(source) { const idx = this.sources.indexOf(source); if (idx > -1) { source.stop(); this.sources.splice(idx, 1); } } }