diff options
Diffstat (limited to 'web-timeplot/src/data-sources.js')
| -rw-r--r-- | web-timeplot/src/data-sources.js | 517 |
1 files changed, 517 insertions, 0 deletions
diff --git a/web-timeplot/src/data-sources.js b/web-timeplot/src/data-sources.js new file mode 100644 index 0000000..749a151 --- /dev/null +++ b/web-timeplot/src/data-sources.js @@ -0,0 +1,517 @@ +/** + * Data Sources - Components that generate or provide data to plots + * + * This module implements the data provider side of the architecture. + * Data sources know how to generate or fetch data, but don't know + * anything about visualization. + * + * Architecture: + * - DataSource: Base class with event emitting + * - Specific sources: Implement different data generation strategies + * - Connection: Links sources to plots (see plot-connections.js) + */ + +// Simple EventEmitter (same as in state.js, could be extracted to utils) +class EventEmitter { + constructor() { + this.events = new Map(); + } + + on(event, callback) { + if (!this.events.has(event)) { + this.events.set(event, []); + } + this.events.get(event).push(callback); + return () => this.off(event, callback); + } + + off(event, callback) { + if (!this.events.has(event)) return; + const callbacks = this.events.get(event); + const index = callbacks.indexOf(callback); + if (index > -1) { + callbacks.splice(index, 1); + } + } + + emit(event, data) { + if (!this.events.has(event)) return; + this.events.get(event).forEach(callback => { + try { + callback(data); + } catch (e) { + console.error(`[DataSource] Error in event handler for '${event}':`, e); + } + }); + } +} + +/** + * Base class for all data sources + * + * Events emitted: + * - 'line': {points: Array, timestamp: number, metadata: Object} + * - 'point': {value: number, timestamp: number} + * - 'error': {error: Error} + */ +export class DataSource extends EventEmitter { + constructor(config = {}) { + super(); + this.config = config; + this.isRunning = false; + this.time = 0; + } + + /** + * Start generating/providing data + */ + start() { + this.isRunning = true; + } + + /** + * Stop generating/providing data + */ + stop() { + this.isRunning = false; + } + + /** + * Reset the data source to initial state + */ + reset() { + this.time = 0; + } + + /** + * Emit a complete line of data + */ + emitLine(points, metadata = {}) { + this.emit('line', { + points, + timestamp: metadata.timestamp || Date.now(), + metadata, + }); + } + + /** + * Emit a single data point + */ + emitPoint(value, timestamp = Date.now()) { + this.emit('point', { + value, + timestamp, + }); + } + + /** + * Emit an error + */ + emitError(error) { + this.emit('error', { error }); + } +} + +/** + * Synthetic data source using test generators + * Uses the generators from test-data-generators.js + */ +export class SyntheticDataSource extends DataSource { + constructor(config = {}) { + super(config); + this.generator = config.generator; // Instance of DataGenerator + this.pointsPerLine = config.pointsPerLine || 100; + this.width = config.width || 800; + this.lineInterval = config.lineInterval || 100; // ms between lines + this.intervalHandle = null; + } + + start() { + if (this.isRunning) return; + super.start(); + + // Generate a new line periodically + this.intervalHandle = setInterval(() => { + this.generateAndEmitLine(); + }, this.lineInterval); + + // Generate initial line immediately + this.generateAndEmitLine(); + } + + stop() { + super.stop(); + if (this.intervalHandle) { + clearInterval(this.intervalHandle); + this.intervalHandle = null; + } + } + + generateAndEmitLine() { + if (!this.generator) { + this.emitError(new Error('No generator configured')); + return; + } + + const points = this.generator.generateLine(this.pointsPerLine, this.width); + this.emitLine(points, { + timestamp: Date.now(), + generatorType: this.generator.constructor.name, + }); + } + + setGenerator(generator) { + this.generator = generator; + } +} + +/** + * Function-based data source + * Evaluates a user-provided function to generate data + */ +export class FunctionDataSource extends DataSource { + constructor(config = {}) { + super(config); + // Function should have signature: (x, t) => y + // x: normalized position 0-1 + // t: time in seconds + // returns: y value + this.func = config.func || ((x, t) => Math.sin(x * 10 + t)); + this.pointsPerLine = config.pointsPerLine || 100; + this.width = config.width || 800; + this.amplitude = config.amplitude || 30; + this.lineInterval = config.lineInterval || 100; + this.intervalHandle = null; + } + + start() { + if (this.isRunning) return; + super.start(); + + this.intervalHandle = setInterval(() => { + this.generateAndEmitLine(); + }, this.lineInterval); + + this.generateAndEmitLine(); + } + + stop() { + super.stop(); + if (this.intervalHandle) { + clearInterval(this.intervalHandle); + this.intervalHandle = null; + } + } + + generateAndEmitLine() { + const points = []; + const t = this.time; + + for (let i = 0; i < this.pointsPerLine; i++) { + const x = (i / this.pointsPerLine) * this.width; + const normalizedX = i / this.pointsPerLine; + const y = this.func(normalizedX, t) * this.amplitude; + points.push({ x, y }); + } + + this.emitLine(points, { + timestamp: Date.now(), + time: t, + }); + + this.time += this.lineInterval / 1000; + } + + setFunction(func) { + this.func = func; + } +} + +/** + * Streaming data source + * Emits individual data points that get buffered into lines + */ +export class StreamingDataSource extends DataSource { + constructor(config = {}) { + super(config); + this.generator = config.generator; + this.sampleRate = config.sampleRate || 60; // Samples per second + this.intervalHandle = null; + } + + start() { + if (this.isRunning) return; + super.start(); + + const intervalMs = 1000 / this.sampleRate; + this.intervalHandle = setInterval(() => { + this.generateAndEmitPoint(); + }, intervalMs); + } + + stop() { + super.stop(); + if (this.intervalHandle) { + clearInterval(this.intervalHandle); + this.intervalHandle = null; + } + } + + generateAndEmitPoint() { + if (!this.generator) { + this.emitError(new Error('No generator configured')); + return; + } + + const value = this.generator.sample(); + this.generator.time += 1 / this.generator.sampleRate; + this.emitPoint(value, Date.now()); + } + + setGenerator(generator) { + this.generator = generator; + } +} + +/** + * WebSocket data source (for real data) + * Receives data from a WebSocket connection + */ +export class WebSocketDataSource extends DataSource { + constructor(config = {}) { + super(config); + this.url = config.url; + this.socket = null; + this.reconnectInterval = config.reconnectInterval || 5000; + this.reconnectHandle = null; + } + + start() { + if (this.isRunning) return; + super.start(); + this.connect(); + } + + stop() { + super.stop(); + if (this.socket) { + this.socket.close(); + this.socket = null; + } + if (this.reconnectHandle) { + clearTimeout(this.reconnectHandle); + this.reconnectHandle = null; + } + } + + connect() { + try { + this.socket = new WebSocket(this.url); + + this.socket.onopen = () => { + console.log(`[WebSocketDataSource] Connected to ${this.url}`); + }; + + this.socket.onmessage = (event) => { + this.handleMessage(event.data); + }; + + this.socket.onerror = (error) => { + console.error('[WebSocketDataSource] Error:', error); + this.emitError(error); + }; + + this.socket.onclose = () => { + console.log('[WebSocketDataSource] Connection closed'); + if (this.isRunning) { + // Auto-reconnect + this.reconnectHandle = setTimeout(() => { + this.connect(); + }, this.reconnectInterval); + } + }; + } catch (error) { + console.error('[WebSocketDataSource] Failed to connect:', error); + this.emitError(error); + } + } + + handleMessage(data) { + try { + const parsed = JSON.parse(data); + + // Expect format: {type: 'line', points: [...]} or {type: 'point', value: ...} + if (parsed.type === 'line' && parsed.points) { + this.emitLine(parsed.points, parsed.metadata || {}); + } else if (parsed.type === 'point' && parsed.value !== undefined) { + this.emitPoint(parsed.value, parsed.timestamp); + } else { + console.warn('[WebSocketDataSource] Unknown message format:', parsed); + } + } catch (error) { + console.error('[WebSocketDataSource] Failed to parse message:', error); + this.emitError(error); + } + } + + send(data) { + if (this.socket && this.socket.readyState === WebSocket.OPEN) { + this.socket.send(JSON.stringify(data)); + } + } +} + +/** + * CSV File data source + * Reads data from CSV files (for replay/analysis) + */ +export class CSVDataSource extends DataSource { + constructor(config = {}) { + super(config); + this.data = []; // Parsed CSV data + this.currentIndex = 0; + this.playbackRate = config.playbackRate || 1.0; + this.loop = config.loop || false; + this.intervalHandle = null; + } + + /** + * Load CSV data from a string + * Expected format: timestamp,value or x,y format + */ + loadCSV(csvString) { + const lines = csvString.trim().split('\n'); + const headers = lines[0].split(',').map(h => h.trim()); + + this.data = []; + for (let i = 1; i < lines.length; i++) { + const values = lines[i].split(',').map(v => parseFloat(v.trim())); + if (values.length >= 2 && !values.some(isNaN)) { + this.data.push({ + timestamp: values[0], + value: values[1], + }); + } + } + + console.log(`[CSVDataSource] Loaded ${this.data.length} data points`); + } + + start() { + if (this.isRunning || this.data.length === 0) return; + super.start(); + + // Play back at specified rate + this.intervalHandle = setInterval(() => { + this.emitNextPoint(); + }, 16 / this.playbackRate); // ~60fps adjusted by playback rate + } + + stop() { + super.stop(); + if (this.intervalHandle) { + clearInterval(this.intervalHandle); + this.intervalHandle = null; + } + } + + reset() { + super.reset(); + this.currentIndex = 0; + } + + emitNextPoint() { + if (this.currentIndex >= this.data.length) { + if (this.loop) { + this.currentIndex = 0; + } else { + this.stop(); + return; + } + } + + const point = this.data[this.currentIndex]; + this.emitPoint(point.value, point.timestamp); + this.currentIndex++; + } +} + +/** + * Multi-source combiner + * Combines data from multiple sources + */ +export class CompositeDataSource extends DataSource { + constructor(config = {}) { + super(config); + this.sources = config.sources || []; + this.combineMode = config.combineMode || 'average'; // 'average', 'sum', 'max', 'min' + this.pointBuffer = new Map(); // sourceId => latest point + } + + start() { + if (this.isRunning) return; + super.start(); + + // Subscribe to all sources + this.sources.forEach((source, idx) => { + source.on('point', (data) => { + this.handleSourcePoint(idx, data); + }); + source.on('line', (data) => { + this.handleSourceLine(idx, data); + }); + source.start(); + }); + } + + stop() { + super.stop(); + this.sources.forEach(source => source.stop()); + } + + handleSourcePoint(sourceIdx, data) { + this.pointBuffer.set(sourceIdx, data.value); + + // If we have data from all sources, combine and emit + if (this.pointBuffer.size === this.sources.length) { + const combined = this.combineValues(Array.from(this.pointBuffer.values())); + this.emitPoint(combined, data.timestamp); + } + } + + handleSourceLine(sourceIdx, data) { + // For lines, just pass through for now + // Could implement line combination if needed + this.emitLine(data.points, data.metadata); + } + + combineValues(values) { + switch (this.combineMode) { + case 'sum': + return values.reduce((a, b) => a + b, 0); + case 'average': + return values.reduce((a, b) => a + b, 0) / values.length; + case 'max': + return Math.max(...values); + case 'min': + return Math.min(...values); + default: + return values[0]; + } + } + + addSource(source) { + this.sources.push(source); + if (this.isRunning) { + source.start(); + } + } + + removeSource(source) { + const idx = this.sources.indexOf(source); + if (idx > -1) { + source.stop(); + this.sources.splice(idx, 1); + } + } +} |
