diff options
Diffstat (limited to 'web-timeplot/src/plot-connections.js')
| -rw-r--r-- | web-timeplot/src/plot-connections.js | 392 |
1 files changed, 392 insertions, 0 deletions
diff --git a/web-timeplot/src/plot-connections.js b/web-timeplot/src/plot-connections.js new file mode 100644 index 0000000..0e96dd8 --- /dev/null +++ b/web-timeplot/src/plot-connections.js @@ -0,0 +1,392 @@ +/** + * Plot Connections - Links data sources to visualization plots + * + * This module manages the connection between data sources and plots, + * handling buffering, timing, and data flow. + * + * Connection Types: + * - DirectConnection: Lines from source → plot (no buffering) + * - BufferedConnection: Points → buffer → lines → plot + * - SynchronizedConnection: Multiple sources → synchronized output + */ + +/** + * Base connection class + */ +class PlotConnection { + constructor(source, plot, config = {}) { + this.source = source; + this.plot = plot; + this.config = config; + this.isActive = false; + this.subscriptions = []; + } + + /** + * Activate the connection - start data flow + */ + connect() { + if (this.isActive) return; + this.isActive = true; + this.setupSubscriptions(); + this.source.start(); + } + + /** + * Deactivate the connection - stop data flow + */ + disconnect() { + if (!this.isActive) return; + this.isActive = false; + this.cleanup(); + this.source.stop(); + } + + /** + * Setup event subscriptions (override in subclasses) + */ + setupSubscriptions() { + throw new Error('setupSubscriptions() must be implemented by subclass'); + } + + /** + * Cleanup subscriptions + */ + cleanup() { + this.subscriptions.forEach(unsub => unsub()); + this.subscriptions = []; + } +} + +/** + * Direct connection - passes lines directly from source to plot + * Use when source emits complete lines of data + */ +export class DirectConnection extends PlotConnection { + setupSubscriptions() { + const unsubLine = this.source.on('line', (data) => { + this.plot.addLine(data.points, data.metadata); + }); + + const unsubError = this.source.on('error', (data) => { + console.error('[DirectConnection] Source error:', data.error); + }); + + this.subscriptions.push(unsubLine, unsubError); + } +} + +/** + * Buffered connection - buffers individual points into lines + * Use when source emits individual data points that need to be assembled + */ +export class BufferedConnection extends PlotConnection { + constructor(source, plot, config = {}) { + super(source, plot, config); + this.buffer = []; + this.bufferSize = config.bufferSize || 100; + this.bufferTimeout = config.bufferTimeout || 1000; // ms + this.lastFlush = Date.now(); + this.flushHandle = null; + + // Start auto-flush timer + if (config.autoFlush !== false) { + this.startAutoFlush(); + } + } + + setupSubscriptions() { + const unsubPoint = this.source.on('point', (data) => { + this.addToBuffer(data); + }); + + const unsubError = this.source.on('error', (data) => { + console.error('[BufferedConnection] Source error:', data.error); + }); + + this.subscriptions.push(unsubPoint, unsubError); + } + + addToBuffer(data) { + this.buffer.push(data); + + // Flush if buffer is full + if (this.buffer.length >= this.bufferSize) { + this.flush(); + } + } + + flush() { + if (this.buffer.length === 0) return; + + // Convert buffer to line points + const points = this.buffer.map((data, idx) => { + const x = (idx / this.buffer.length) * this.plot.width; + return { x, y: data.value }; + }); + + this.plot.addLine(points, { + timestamp: this.lastFlush, + pointCount: this.buffer.length, + }); + + this.buffer = []; + this.lastFlush = Date.now(); + } + + startAutoFlush() { + this.flushHandle = setInterval(() => { + const timeSinceLastFlush = Date.now() - this.lastFlush; + if (timeSinceLastFlush >= this.bufferTimeout && this.buffer.length > 0) { + this.flush(); + } + }, 100); // Check every 100ms + } + + cleanup() { + super.cleanup(); + if (this.flushHandle) { + clearInterval(this.flushHandle); + this.flushHandle = null; + } + } +} + +/** + * Synchronized connection - synchronizes multiple sources to one plot + * Useful for combining multiple data streams + */ +export class SynchronizedConnection extends PlotConnection { + constructor(sources, plot, config = {}) { + super(null, plot, config); // No single source + this.sources = sources; + this.syncMode = config.syncMode || 'wait-for-all'; // 'wait-for-all', 'first-available' + this.lineBuffers = new Map(); // sourceId => latest line + } + + connect() { + if (this.isActive) return; + this.isActive = true; + + this.sources.forEach((source, idx) => { + const unsubLine = source.on('line', (data) => { + this.handleSourceLine(idx, data); + }); + + const unsubError = source.on('error', (data) => { + console.error(`[SynchronizedConnection] Source ${idx} error:`, data.error); + }); + + this.subscriptions.push(unsubLine, unsubError); + source.start(); + }); + } + + disconnect() { + if (!this.isActive) return; + this.isActive = false; + this.cleanup(); + this.sources.forEach(source => source.stop()); + } + + handleSourceLine(sourceIdx, data) { + this.lineBuffers.set(sourceIdx, data); + + if (this.syncMode === 'wait-for-all') { + // Wait until we have data from all sources + if (this.lineBuffers.size === this.sources.length) { + this.emitSynchronized(); + } + } else if (this.syncMode === 'first-available') { + // Emit immediately + this.plot.addLine(data.points, { + ...data.metadata, + sourceIdx, + }); + } + } + + emitSynchronized() { + // For now, just emit the first source's line + // Could implement more sophisticated merging + const firstLine = this.lineBuffers.get(0); + if (firstLine) { + this.plot.addLine(firstLine.points, firstLine.metadata); + } + this.lineBuffers.clear(); + } +} + +/** + * Connection Manager - manages multiple connections + */ +export class ConnectionManager { + constructor() { + this.connections = new Map(); // connectionId => connection + this.nextId = 0; + } + + /** + * Create and register a connection + * @returns {number} connectionId + */ + connect(source, plot, config = {}) { + const type = config.type || 'direct'; + let connection; + + switch (type) { + case 'direct': + connection = new DirectConnection(source, plot, config); + break; + case 'buffered': + connection = new BufferedConnection(source, plot, config); + break; + case 'synchronized': + connection = new SynchronizedConnection(source, plot, config); + break; + default: + throw new Error(`Unknown connection type: ${type}`); + } + + const id = this.nextId++; + this.connections.set(id, connection); + connection.connect(); + + return id; + } + + /** + * Disconnect and remove a connection + */ + disconnect(connectionId) { + const connection = this.connections.get(connectionId); + if (connection) { + connection.disconnect(); + this.connections.delete(connectionId); + } + } + + /** + * Disconnect all connections + */ + disconnectAll() { + this.connections.forEach(connection => connection.disconnect()); + this.connections.clear(); + } + + /** + * Get statistics about connections + */ + getStats() { + return { + activeConnections: this.connections.size, + connections: Array.from(this.connections.entries()).map(([id, conn]) => ({ + id, + isActive: conn.isActive, + type: conn.constructor.name, + })), + }; + } +} + +/** + * Helper functions for common connection patterns + */ + +/** + * Connect a synthetic data source to a plot + * @param {DataGenerator} generator - Test data generator instance + * @param {TimeSeriesPlot} plot - Plot to display data + * @param {Object} config - Configuration options + * @returns {DirectConnection} The connection instance + */ +export function connectSyntheticData(generator, plot, config = {}) { + const { SyntheticDataSource } = require('./data-sources.js'); + + const source = new SyntheticDataSource({ + generator, + pointsPerLine: config.pointsPerLine || 100, + width: plot.width, + lineInterval: config.lineInterval || 100, + }); + + const connection = new DirectConnection(source, plot, config); + connection.connect(); + + return connection; +} + +/** + * Connect a function-based source to a plot + * @param {Function} func - Function (x, t) => y + * @param {TimeSeriesPlot} plot - Plot to display data + * @param {Object} config - Configuration options + * @returns {DirectConnection} The connection instance + */ +export function connectFunction(func, plot, config = {}) { + const { FunctionDataSource } = require('./data-sources.js'); + + const source = new FunctionDataSource({ + func, + pointsPerLine: config.pointsPerLine || 100, + width: plot.width, + amplitude: config.amplitude || 30, + lineInterval: config.lineInterval || 100, + }); + + const connection = new DirectConnection(source, plot, config); + connection.connect(); + + return connection; +} + +/** + * Connect a streaming source to a plot with buffering + * @param {DataGenerator} generator - Test data generator instance + * @param {TimeSeriesPlot} plot - Plot to display data + * @param {Object} config - Configuration options + * @returns {BufferedConnection} The connection instance + */ +export function connectStreamingData(generator, plot, config = {}) { + const { StreamingDataSource } = require('./data-sources.js'); + + const source = new StreamingDataSource({ + generator, + sampleRate: config.sampleRate || 60, + }); + + const connection = new BufferedConnection(source, plot, { + bufferSize: config.bufferSize || 100, + bufferTimeout: config.bufferTimeout || 1000, + }); + connection.connect(); + + return connection; +} + +/** + * Quick setup: Create a plot with a data source in one call + * @param {Application} app - PixiJS application + * @param {Object} plotConfig - Plot configuration + * @param {Object} sourceConfig - Source configuration + * @returns {Object} {plot, source, connection} + */ +export function createConnectedPlot(app, plotConfig, sourceConfig) { + const { TimeSeriesPlot } = require('./timeseries-plot.js'); + const { SyntheticDataSource } = require('./data-sources.js'); + + const plot = new TimeSeriesPlot(plotConfig); + app.stage.addChild(plot.container); + + const source = new SyntheticDataSource({ + generator: sourceConfig.generator, + pointsPerLine: plotConfig.width / 8, // Default: ~8 pixels per point + width: plotConfig.width, + lineInterval: sourceConfig.lineInterval || 100, + }); + + const connection = new DirectConnection(source, plot); + connection.connect(); + + return { plot, source, connection }; +} |
