summaryrefslogtreecommitdiff
path: root/src/plot-connections.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/plot-connections.js')
-rw-r--r--src/plot-connections.js392
1 files changed, 392 insertions, 0 deletions
diff --git a/src/plot-connections.js b/src/plot-connections.js
new file mode 100644
index 0000000..0e96dd8
--- /dev/null
+++ b/src/plot-connections.js
@@ -0,0 +1,392 @@
+/**
+ * Plot Connections - Links data sources to visualization plots
+ *
+ * This module manages the connection between data sources and plots,
+ * handling buffering, timing, and data flow.
+ *
+ * Connection Types:
+ * - DirectConnection: Lines from source → plot (no buffering)
+ * - BufferedConnection: Points → buffer → lines → plot
+ * - SynchronizedConnection: Multiple sources → synchronized output
+ */
+
+/**
+ * Base connection class
+ */
+class PlotConnection {
+ constructor(source, plot, config = {}) {
+ this.source = source;
+ this.plot = plot;
+ this.config = config;
+ this.isActive = false;
+ this.subscriptions = [];
+ }
+
+ /**
+ * Activate the connection - start data flow
+ */
+ connect() {
+ if (this.isActive) return;
+ this.isActive = true;
+ this.setupSubscriptions();
+ this.source.start();
+ }
+
+ /**
+ * Deactivate the connection - stop data flow
+ */
+ disconnect() {
+ if (!this.isActive) return;
+ this.isActive = false;
+ this.cleanup();
+ this.source.stop();
+ }
+
+ /**
+ * Setup event subscriptions (override in subclasses)
+ */
+ setupSubscriptions() {
+ throw new Error('setupSubscriptions() must be implemented by subclass');
+ }
+
+ /**
+ * Cleanup subscriptions
+ */
+ cleanup() {
+ this.subscriptions.forEach(unsub => unsub());
+ this.subscriptions = [];
+ }
+}
+
+/**
+ * Direct connection - passes lines directly from source to plot
+ * Use when source emits complete lines of data
+ */
+export class DirectConnection extends PlotConnection {
+ setupSubscriptions() {
+ const unsubLine = this.source.on('line', (data) => {
+ this.plot.addLine(data.points, data.metadata);
+ });
+
+ const unsubError = this.source.on('error', (data) => {
+ console.error('[DirectConnection] Source error:', data.error);
+ });
+
+ this.subscriptions.push(unsubLine, unsubError);
+ }
+}
+
+/**
+ * Buffered connection - buffers individual points into lines
+ * Use when source emits individual data points that need to be assembled
+ */
+export class BufferedConnection extends PlotConnection {
+ constructor(source, plot, config = {}) {
+ super(source, plot, config);
+ this.buffer = [];
+ this.bufferSize = config.bufferSize || 100;
+ this.bufferTimeout = config.bufferTimeout || 1000; // ms
+ this.lastFlush = Date.now();
+ this.flushHandle = null;
+
+ // Start auto-flush timer
+ if (config.autoFlush !== false) {
+ this.startAutoFlush();
+ }
+ }
+
+ setupSubscriptions() {
+ const unsubPoint = this.source.on('point', (data) => {
+ this.addToBuffer(data);
+ });
+
+ const unsubError = this.source.on('error', (data) => {
+ console.error('[BufferedConnection] Source error:', data.error);
+ });
+
+ this.subscriptions.push(unsubPoint, unsubError);
+ }
+
+ addToBuffer(data) {
+ this.buffer.push(data);
+
+ // Flush if buffer is full
+ if (this.buffer.length >= this.bufferSize) {
+ this.flush();
+ }
+ }
+
+ flush() {
+ if (this.buffer.length === 0) return;
+
+ // Convert buffer to line points
+ const points = this.buffer.map((data, idx) => {
+ const x = (idx / this.buffer.length) * this.plot.width;
+ return { x, y: data.value };
+ });
+
+ this.plot.addLine(points, {
+ timestamp: this.lastFlush,
+ pointCount: this.buffer.length,
+ });
+
+ this.buffer = [];
+ this.lastFlush = Date.now();
+ }
+
+ startAutoFlush() {
+ this.flushHandle = setInterval(() => {
+ const timeSinceLastFlush = Date.now() - this.lastFlush;
+ if (timeSinceLastFlush >= this.bufferTimeout && this.buffer.length > 0) {
+ this.flush();
+ }
+ }, 100); // Check every 100ms
+ }
+
+ cleanup() {
+ super.cleanup();
+ if (this.flushHandle) {
+ clearInterval(this.flushHandle);
+ this.flushHandle = null;
+ }
+ }
+}
+
+/**
+ * Synchronized connection - synchronizes multiple sources to one plot
+ * Useful for combining multiple data streams
+ */
+export class SynchronizedConnection extends PlotConnection {
+ constructor(sources, plot, config = {}) {
+ super(null, plot, config); // No single source
+ this.sources = sources;
+ this.syncMode = config.syncMode || 'wait-for-all'; // 'wait-for-all', 'first-available'
+ this.lineBuffers = new Map(); // sourceId => latest line
+ }
+
+ connect() {
+ if (this.isActive) return;
+ this.isActive = true;
+
+ this.sources.forEach((source, idx) => {
+ const unsubLine = source.on('line', (data) => {
+ this.handleSourceLine(idx, data);
+ });
+
+ const unsubError = source.on('error', (data) => {
+ console.error(`[SynchronizedConnection] Source ${idx} error:`, data.error);
+ });
+
+ this.subscriptions.push(unsubLine, unsubError);
+ source.start();
+ });
+ }
+
+ disconnect() {
+ if (!this.isActive) return;
+ this.isActive = false;
+ this.cleanup();
+ this.sources.forEach(source => source.stop());
+ }
+
+ handleSourceLine(sourceIdx, data) {
+ this.lineBuffers.set(sourceIdx, data);
+
+ if (this.syncMode === 'wait-for-all') {
+ // Wait until we have data from all sources
+ if (this.lineBuffers.size === this.sources.length) {
+ this.emitSynchronized();
+ }
+ } else if (this.syncMode === 'first-available') {
+ // Emit immediately
+ this.plot.addLine(data.points, {
+ ...data.metadata,
+ sourceIdx,
+ });
+ }
+ }
+
+ emitSynchronized() {
+ // For now, just emit the first source's line
+ // Could implement more sophisticated merging
+ const firstLine = this.lineBuffers.get(0);
+ if (firstLine) {
+ this.plot.addLine(firstLine.points, firstLine.metadata);
+ }
+ this.lineBuffers.clear();
+ }
+}
+
+/**
+ * Connection Manager - manages multiple connections
+ */
+export class ConnectionManager {
+ constructor() {
+ this.connections = new Map(); // connectionId => connection
+ this.nextId = 0;
+ }
+
+ /**
+ * Create and register a connection
+ * @returns {number} connectionId
+ */
+ connect(source, plot, config = {}) {
+ const type = config.type || 'direct';
+ let connection;
+
+ switch (type) {
+ case 'direct':
+ connection = new DirectConnection(source, plot, config);
+ break;
+ case 'buffered':
+ connection = new BufferedConnection(source, plot, config);
+ break;
+ case 'synchronized':
+ connection = new SynchronizedConnection(source, plot, config);
+ break;
+ default:
+ throw new Error(`Unknown connection type: ${type}`);
+ }
+
+ const id = this.nextId++;
+ this.connections.set(id, connection);
+ connection.connect();
+
+ return id;
+ }
+
+ /**
+ * Disconnect and remove a connection
+ */
+ disconnect(connectionId) {
+ const connection = this.connections.get(connectionId);
+ if (connection) {
+ connection.disconnect();
+ this.connections.delete(connectionId);
+ }
+ }
+
+ /**
+ * Disconnect all connections
+ */
+ disconnectAll() {
+ this.connections.forEach(connection => connection.disconnect());
+ this.connections.clear();
+ }
+
+ /**
+ * Get statistics about connections
+ */
+ getStats() {
+ return {
+ activeConnections: this.connections.size,
+ connections: Array.from(this.connections.entries()).map(([id, conn]) => ({
+ id,
+ isActive: conn.isActive,
+ type: conn.constructor.name,
+ })),
+ };
+ }
+}
+
+/**
+ * Helper functions for common connection patterns
+ */
+
+/**
+ * Connect a synthetic data source to a plot
+ * @param {DataGenerator} generator - Test data generator instance
+ * @param {TimeSeriesPlot} plot - Plot to display data
+ * @param {Object} config - Configuration options
+ * @returns {DirectConnection} The connection instance
+ */
+export function connectSyntheticData(generator, plot, config = {}) {
+ const { SyntheticDataSource } = require('./data-sources.js');
+
+ const source = new SyntheticDataSource({
+ generator,
+ pointsPerLine: config.pointsPerLine || 100,
+ width: plot.width,
+ lineInterval: config.lineInterval || 100,
+ });
+
+ const connection = new DirectConnection(source, plot, config);
+ connection.connect();
+
+ return connection;
+}
+
+/**
+ * Connect a function-based source to a plot
+ * @param {Function} func - Function (x, t) => y
+ * @param {TimeSeriesPlot} plot - Plot to display data
+ * @param {Object} config - Configuration options
+ * @returns {DirectConnection} The connection instance
+ */
+export function connectFunction(func, plot, config = {}) {
+ const { FunctionDataSource } = require('./data-sources.js');
+
+ const source = new FunctionDataSource({
+ func,
+ pointsPerLine: config.pointsPerLine || 100,
+ width: plot.width,
+ amplitude: config.amplitude || 30,
+ lineInterval: config.lineInterval || 100,
+ });
+
+ const connection = new DirectConnection(source, plot, config);
+ connection.connect();
+
+ return connection;
+}
+
+/**
+ * Connect a streaming source to a plot with buffering
+ * @param {DataGenerator} generator - Test data generator instance
+ * @param {TimeSeriesPlot} plot - Plot to display data
+ * @param {Object} config - Configuration options
+ * @returns {BufferedConnection} The connection instance
+ */
+export function connectStreamingData(generator, plot, config = {}) {
+ const { StreamingDataSource } = require('./data-sources.js');
+
+ const source = new StreamingDataSource({
+ generator,
+ sampleRate: config.sampleRate || 60,
+ });
+
+ const connection = new BufferedConnection(source, plot, {
+ bufferSize: config.bufferSize || 100,
+ bufferTimeout: config.bufferTimeout || 1000,
+ });
+ connection.connect();
+
+ return connection;
+}
+
+/**
+ * Quick setup: Create a plot with a data source in one call
+ * @param {Application} app - PixiJS application
+ * @param {Object} plotConfig - Plot configuration
+ * @param {Object} sourceConfig - Source configuration
+ * @returns {Object} {plot, source, connection}
+ */
+export function createConnectedPlot(app, plotConfig, sourceConfig) {
+ const { TimeSeriesPlot } = require('./timeseries-plot.js');
+ const { SyntheticDataSource } = require('./data-sources.js');
+
+ const plot = new TimeSeriesPlot(plotConfig);
+ app.stage.addChild(plot.container);
+
+ const source = new SyntheticDataSource({
+ generator: sourceConfig.generator,
+ pointsPerLine: plotConfig.width / 8, // Default: ~8 pixels per point
+ width: plotConfig.width,
+ lineInterval: sourceConfig.lineInterval || 100,
+ });
+
+ const connection = new DirectConnection(source, plot);
+ connection.connect();
+
+ return { plot, source, connection };
+}