summaryrefslogtreecommitdiff
path: root/src/data/source-registry.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/data/source-registry.js')
-rw-r--r--src/data/source-registry.js90
1 files changed, 90 insertions, 0 deletions
diff --git a/src/data/source-registry.js b/src/data/source-registry.js
new file mode 100644
index 0000000..917d06b
--- /dev/null
+++ b/src/data/source-registry.js
@@ -0,0 +1,90 @@
+import { CsvReplaySource } from './csv-replay-source.js';
+import { SyntheticWaveSource } from './synthetic-wave-source.js';
+import { WebSocketSource } from './websocket-source.js';
+
+export class SourceRegistry {
+ constructor(store, bus) {
+ this.store = store;
+ this.bus = bus;
+ this.sources = new Map();
+ this.syncFromState();
+ }
+
+ syncFromState() {
+ const state = this.store.getState();
+ const sourceEntries = Object.entries(state.sources);
+ const activeKeys = new Set(sourceEntries.map(([sourceKey]) => sourceKey));
+
+ for (const [sourceKey, config] of sourceEntries) {
+ const existingSource = this.sources.get(sourceKey);
+
+ if (!existingSource) {
+ const nextSource = this.createSource(sourceKey, config);
+ this.sources.set(sourceKey, nextSource);
+ nextSource.start(state.time.plotTimeMs);
+ continue;
+ }
+
+ if (existingSource.sourceType !== config.type) {
+ existingSource.stop();
+ const replacementSource = this.createSource(sourceKey, config);
+ this.sources.set(sourceKey, replacementSource);
+ replacementSource.start(state.time.plotTimeMs);
+ continue;
+ }
+
+ existingSource.updateConfig(config);
+ }
+
+ for (const [sourceKey, source] of this.sources.entries()) {
+ if (!activeKeys.has(sourceKey)) {
+ source.stop();
+ this.sources.delete(sourceKey);
+ }
+ }
+ }
+
+ createSource(sourceKey, config) {
+ switch (config.type) {
+ case 'csv-replay':
+ return new CsvReplaySource(config);
+ case 'websocket':
+ return new WebSocketSource(config, {
+ onStatusChange: (statusPatch) => {
+ this.store.setState((state) => ({
+ ...state,
+ sources: {
+ ...state.sources,
+ [sourceKey]: {
+ ...state.sources[sourceKey],
+ ...statusPatch,
+ },
+ },
+ }));
+ },
+ });
+ case 'synthetic-wave':
+ default:
+ return new SyntheticWaveSource(config);
+ }
+ }
+
+ update(currentPlotTimeMs) {
+ for (const [sourceKey, source] of this.sources.entries()) {
+ const points = source.update(currentPlotTimeMs);
+ for (const point of points) {
+ this.bus.emit('data:point', {
+ ...point,
+ sourceId: sourceKey,
+ });
+ }
+ }
+ }
+
+ reset() {
+ const startTimeMs = this.store.getState().time.plotTimeMs;
+ for (const source of this.sources.values()) {
+ source.reset(startTimeMs);
+ }
+ }
+}