/** * Plot Connections - Links data sources to visualization plots * * This module manages the connection between data sources and plots, * handling buffering, timing, and data flow. * * Connection Types: * - DirectConnection: Lines from source → plot (no buffering) * - BufferedConnection: Points → buffer → lines → plot * - SynchronizedConnection: Multiple sources → synchronized output */ /** * Base connection class */ class PlotConnection { constructor(source, plot, config = {}) { this.source = source; this.plot = plot; this.config = config; this.isActive = false; this.subscriptions = []; } /** * Activate the connection - start data flow */ connect() { if (this.isActive) return; this.isActive = true; this.setupSubscriptions(); this.source.start(); } /** * Deactivate the connection - stop data flow */ disconnect() { if (!this.isActive) return; this.isActive = false; this.cleanup(); this.source.stop(); } /** * Setup event subscriptions (override in subclasses) */ setupSubscriptions() { throw new Error('setupSubscriptions() must be implemented by subclass'); } /** * Cleanup subscriptions */ cleanup() { this.subscriptions.forEach(unsub => unsub()); this.subscriptions = []; } } /** * Direct connection - passes lines directly from source to plot * Use when source emits complete lines of data */ export class DirectConnection extends PlotConnection { setupSubscriptions() { const unsubLine = this.source.on('line', (data) => { this.plot.addLine(data.points, data.metadata); }); const unsubError = this.source.on('error', (data) => { console.error('[DirectConnection] Source error:', data.error); }); this.subscriptions.push(unsubLine, unsubError); } } /** * Buffered connection - buffers individual points into lines * Use when source emits individual data points that need to be assembled */ export class BufferedConnection extends PlotConnection { constructor(source, plot, config = {}) { super(source, plot, config); this.buffer = []; this.bufferSize = config.bufferSize || 100; this.bufferTimeout = config.bufferTimeout || 1000; // ms this.lastFlush = Date.now(); this.flushHandle = null; // Start auto-flush timer if (config.autoFlush !== false) { this.startAutoFlush(); } } setupSubscriptions() { const unsubPoint = this.source.on('point', (data) => { this.addToBuffer(data); }); const unsubError = this.source.on('error', (data) => { console.error('[BufferedConnection] Source error:', data.error); }); this.subscriptions.push(unsubPoint, unsubError); } addToBuffer(data) { this.buffer.push(data); // Flush if buffer is full if (this.buffer.length >= this.bufferSize) { this.flush(); } } flush() { if (this.buffer.length === 0) return; // Convert buffer to line points const points = this.buffer.map((data, idx) => { const x = (idx / this.buffer.length) * this.plot.width; return { x, y: data.value }; }); this.plot.addLine(points, { timestamp: this.lastFlush, pointCount: this.buffer.length, }); this.buffer = []; this.lastFlush = Date.now(); } startAutoFlush() { this.flushHandle = setInterval(() => { const timeSinceLastFlush = Date.now() - this.lastFlush; if (timeSinceLastFlush >= this.bufferTimeout && this.buffer.length > 0) { this.flush(); } }, 100); // Check every 100ms } cleanup() { super.cleanup(); if (this.flushHandle) { clearInterval(this.flushHandle); this.flushHandle = null; } } } /** * Synchronized connection - synchronizes multiple sources to one plot * Useful for combining multiple data streams */ export class SynchronizedConnection extends PlotConnection { constructor(sources, plot, config = {}) { super(null, plot, config); // No single source this.sources = sources; this.syncMode = config.syncMode || 'wait-for-all'; // 'wait-for-all', 'first-available' this.lineBuffers = new Map(); // sourceId => latest line } connect() { if (this.isActive) return; this.isActive = true; this.sources.forEach((source, idx) => { const unsubLine = source.on('line', (data) => { this.handleSourceLine(idx, data); }); const unsubError = source.on('error', (data) => { console.error(`[SynchronizedConnection] Source ${idx} error:`, data.error); }); this.subscriptions.push(unsubLine, unsubError); source.start(); }); } disconnect() { if (!this.isActive) return; this.isActive = false; this.cleanup(); this.sources.forEach(source => source.stop()); } handleSourceLine(sourceIdx, data) { this.lineBuffers.set(sourceIdx, data); if (this.syncMode === 'wait-for-all') { // Wait until we have data from all sources if (this.lineBuffers.size === this.sources.length) { this.emitSynchronized(); } } else if (this.syncMode === 'first-available') { // Emit immediately this.plot.addLine(data.points, { ...data.metadata, sourceIdx, }); } } emitSynchronized() { // For now, just emit the first source's line // Could implement more sophisticated merging const firstLine = this.lineBuffers.get(0); if (firstLine) { this.plot.addLine(firstLine.points, firstLine.metadata); } this.lineBuffers.clear(); } } /** * Connection Manager - manages multiple connections */ export class ConnectionManager { constructor() { this.connections = new Map(); // connectionId => connection this.nextId = 0; } /** * Create and register a connection * @returns {number} connectionId */ connect(source, plot, config = {}) { const type = config.type || 'direct'; let connection; switch (type) { case 'direct': connection = new DirectConnection(source, plot, config); break; case 'buffered': connection = new BufferedConnection(source, plot, config); break; case 'synchronized': connection = new SynchronizedConnection(source, plot, config); break; default: throw new Error(`Unknown connection type: ${type}`); } const id = this.nextId++; this.connections.set(id, connection); connection.connect(); return id; } /** * Disconnect and remove a connection */ disconnect(connectionId) { const connection = this.connections.get(connectionId); if (connection) { connection.disconnect(); this.connections.delete(connectionId); } } /** * Disconnect all connections */ disconnectAll() { this.connections.forEach(connection => connection.disconnect()); this.connections.clear(); } /** * Get statistics about connections */ getStats() { return { activeConnections: this.connections.size, connections: Array.from(this.connections.entries()).map(([id, conn]) => ({ id, isActive: conn.isActive, type: conn.constructor.name, })), }; } } /** * Helper functions for common connection patterns */ /** * Connect a synthetic data source to a plot * @param {DataGenerator} generator - Test data generator instance * @param {TimeSeriesPlot} plot - Plot to display data * @param {Object} config - Configuration options * @returns {DirectConnection} The connection instance */ export function connectSyntheticData(generator, plot, config = {}) { const { SyntheticDataSource } = require('./data-sources.js'); const source = new SyntheticDataSource({ generator, pointsPerLine: config.pointsPerLine || 100, width: plot.width, lineInterval: config.lineInterval || 100, }); const connection = new DirectConnection(source, plot, config); connection.connect(); return connection; } /** * Connect a function-based source to a plot * @param {Function} func - Function (x, t) => y * @param {TimeSeriesPlot} plot - Plot to display data * @param {Object} config - Configuration options * @returns {DirectConnection} The connection instance */ export function connectFunction(func, plot, config = {}) { const { FunctionDataSource } = require('./data-sources.js'); const source = new FunctionDataSource({ func, pointsPerLine: config.pointsPerLine || 100, width: plot.width, amplitude: config.amplitude || 30, lineInterval: config.lineInterval || 100, }); const connection = new DirectConnection(source, plot, config); connection.connect(); return connection; } /** * Connect a streaming source to a plot with buffering * @param {DataGenerator} generator - Test data generator instance * @param {TimeSeriesPlot} plot - Plot to display data * @param {Object} config - Configuration options * @returns {BufferedConnection} The connection instance */ export function connectStreamingData(generator, plot, config = {}) { const { StreamingDataSource } = require('./data-sources.js'); const source = new StreamingDataSource({ generator, sampleRate: config.sampleRate || 60, }); const connection = new BufferedConnection(source, plot, { bufferSize: config.bufferSize || 100, bufferTimeout: config.bufferTimeout || 1000, }); connection.connect(); return connection; } /** * Quick setup: Create a plot with a data source in one call * @param {Application} app - PixiJS application * @param {Object} plotConfig - Plot configuration * @param {Object} sourceConfig - Source configuration * @returns {Object} {plot, source, connection} */ export function createConnectedPlot(app, plotConfig, sourceConfig) { const { TimeSeriesPlot } = require('./timeseries-plot.js'); const { SyntheticDataSource } = require('./data-sources.js'); const plot = new TimeSeriesPlot(plotConfig); app.stage.addChild(plot.container); const source = new SyntheticDataSource({ generator: sourceConfig.generator, pointsPerLine: plotConfig.width / 8, // Default: ~8 pixels per point width: plotConfig.width, lineInterval: sourceConfig.lineInterval || 100, }); const connection = new DirectConnection(source, plot); connection.connect(); return { plot, source, connection }; }