summaryrefslogtreecommitdiff
path: root/web-timeplot/src/data/source-registry.js
blob: 917d06b397039d8bd5eadce59bcb096d095e3acb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import { CsvReplaySource } from './csv-replay-source.js';
import { SyntheticWaveSource } from './synthetic-wave-source.js';
import { WebSocketSource } from './websocket-source.js';

export class SourceRegistry {
    constructor(store, bus) {
        this.store = store;
        this.bus = bus;
        this.sources = new Map();
        this.syncFromState();
    }

    syncFromState() {
        const state = this.store.getState();
        const sourceEntries = Object.entries(state.sources);
        const activeKeys = new Set(sourceEntries.map(([sourceKey]) => sourceKey));

        for (const [sourceKey, config] of sourceEntries) {
            const existingSource = this.sources.get(sourceKey);

            if (!existingSource) {
                const nextSource = this.createSource(sourceKey, config);
                this.sources.set(sourceKey, nextSource);
                nextSource.start(state.time.plotTimeMs);
                continue;
            }

            if (existingSource.sourceType !== config.type) {
                existingSource.stop();
                const replacementSource = this.createSource(sourceKey, config);
                this.sources.set(sourceKey, replacementSource);
                replacementSource.start(state.time.plotTimeMs);
                continue;
            }

            existingSource.updateConfig(config);
        }

        for (const [sourceKey, source] of this.sources.entries()) {
            if (!activeKeys.has(sourceKey)) {
                source.stop();
                this.sources.delete(sourceKey);
            }
        }
    }

    createSource(sourceKey, config) {
        switch (config.type) {
            case 'csv-replay':
                return new CsvReplaySource(config);
            case 'websocket':
                return new WebSocketSource(config, {
                    onStatusChange: (statusPatch) => {
                        this.store.setState((state) => ({
                            ...state,
                            sources: {
                                ...state.sources,
                                [sourceKey]: {
                                    ...state.sources[sourceKey],
                                    ...statusPatch,
                                },
                            },
                        }));
                    },
                });
            case 'synthetic-wave':
            default:
                return new SyntheticWaveSource(config);
        }
    }

    update(currentPlotTimeMs) {
        for (const [sourceKey, source] of this.sources.entries()) {
            const points = source.update(currentPlotTimeMs);
            for (const point of points) {
                this.bus.emit('data:point', {
                    ...point,
                    sourceId: sourceKey,
                });
            }
        }
    }

    reset() {
        const startTimeMs = this.store.getState().time.plotTimeMs;
        for (const source of this.sources.values()) {
            source.reset(startTimeMs);
        }
    }
}