From 6fdc29b2263ebeaee6e2765dd60bf81ccda22d80 Mon Sep 17 00:00:00 2001 From: MysteryCode Date: Mon, 22 Jul 2024 11:07:15 +0200 Subject: [PATCH] implement htp and ltp merging receivers (#58) Co-authored-by: Kyle Hensel --- src/index.ts | 2 + src/merging.ts | 237 +++++++++++++++++++++++++++++++++---------- test/merging.test.ts | 135 ++++++++++++++++++++++++ 3 files changed, 318 insertions(+), 56 deletions(-) create mode 100644 test/merging.test.ts diff --git a/src/index.ts b/src/index.ts index 4e7b8f5..0831847 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,5 @@ export * from './packet'; export * from './receiver'; +// eslint-disable-next-line camelcase +export { MergingReceiver as unstable_MergingReceiver } from './merging'; export * from './sender'; diff --git a/src/merging.ts b/src/merging.ts index 7395fca..f5b4fc0 100644 --- a/src/merging.ts +++ b/src/merging.ts @@ -1,27 +1,65 @@ -import { performance } from 'node:perf_hooks'; -import { Packet } from './packet'; +import { performance } from 'perf_hooks'; +import type { Packet } from './packet'; import { Receiver } from './receiver'; import type { Payload } from './util'; -interface Universe { - lastData: Payload; - servers: Map; -} - -interface PacketWithTimestamp { - readonly packet: Packet; - readonly lastTimestamp: number; -} +/** + * @deprecated CAUTION: This feature is experimental, + * and has not been thoroughly tested. It may not behave + * correctly. There is no guarantee that it adheres to + * the E1.33 standard. + */ +export namespace MergingReceiver { + /** See {@link Props.mode here} for docs */ + export type Mode = 'HTP' | 'LTP'; -export namespace ReceiverMerge { export interface Props extends Receiver.Props { + /** + * ### Different priority + * . + * When merging, all senders should normally have a different + * `priority`. Following this rule will prevent most of the + * confusion around merging. + * + * 💡 _Use case: tracking-backup console._ + * + * ### Same priority + * . + * If there are 2 senders with the same `priority`, + * then you need to specify the merging mode: + * + * - `HTP` = **H**ighest **t**akes **P**riority. This means + * that the receiver will use the highest channel value from + * all senders with the same `priority`. If there is a + * malfunction, channels may appear to be stuck, even when + * blacked-out on one console. + * 💡 _Use case: {@link https://youtu.be/vygFW9FDYtM parking a channel} + * or controlling {@link https://en.wiktionary.org/wiki/houselights houselights} + * from a different console._ + * + * - `LTP` = **L**atest **t**akes **P**riority. This means that + * the receiver will use the latest data that it receives from + * the senders with the highest `priority`. **This options is + * not recomended, because a malfunction will cause of lights + * to flicker uncontrollably.** + * 💡 _Use case: none._ + * + * ℹ️ Please refer to the README for more information. + * + * @default 'HTP' + */ + mode?: Mode; timeout?: number; } export interface EventMap extends Receiver.EventMap { changed: { universe: number; - addr: number; + payload: Payload; + }; + changedValue: { + universe: number; + address: number; newValue: number; oldValue: number; }; @@ -37,68 +75,101 @@ export namespace ReceiverMerge { lastPacket: Packet; }; } + + export interface PacketWithTime { + readonly packet: Packet; + readonly timestamp: number; + } + + export interface UniverseData { + referenceData: Payload; + servers: Map; + } + + export interface PreparedData { + universe: number; + maximumPriority: number; + universeData: UniverseData; + } } -export declare interface ReceiverMerge { +export declare interface MergingReceiver { on( type: K, listener: (event: Receiver.EventMap[K]) => void, ): this; } -export class ReceiverMerge extends Receiver { - constructor({ timeout = 5000, ...props }: ReceiverMerge.Props) { +export class MergingReceiver extends Receiver { + private readonly mode: MergingReceiver.Mode; + + private readonly timeout: number; + + private data = new Map(); + + constructor({ + mode = 'HTP', + timeout = 5000, + ...props + }: MergingReceiver.Props) { super(props); - this.timeout = timeout; - super.on('packet', this.mergePacket); - } - readonly timeout: number; + this.mode = mode; + this.timeout = timeout; - protected data = new Map(); + super.on('packet', (packet) => { + const data = this.prepareData(packet); + const mergedData = MergingReceiver[this.mode](data); + this.handleChanges(data, mergedData); + }); + } - public mergePacket(packet: Packet) { - const universe = packet.universe.toString(36); + private prepareData(packet: Packet): MergingReceiver.PreparedData { + const currentTime = performance.now(); + const universe = parseInt(packet.universe.toString(36), 10); const cid = packet.cid.toString(); + // universe is unknown if (!this.data.has(universe)) { this.data.set(universe, { - lastData: [], + referenceData: {}, servers: new Map(), }); + this.emit('newUniverse', { - universe: packet.universe, + universe, firstPacket: packet, }); } const universeData = this.data.get(universe); - if (!universeData) { throw new Error('[sACN] Internal Error: universeData is undefined'); } + // sender is unknown for this universe if (!universeData.servers.has(cid)) { this.emit('senderConnect', { cid: packet.cid, - universe: packet.universe, + universe, firstPacket: packet, }); } - const ts = performance.now(); - + // register current package universeData.servers.set(cid, { packet, - lastTimestamp: ts, + timestamp: currentTime, }); + // check whether sender disconnects setTimeout(() => { - if (universeData.servers.get(cid)?.lastTimestamp === ts) { + if (universeData.servers.get(cid)?.timestamp === currentTime) { universeData.servers.delete(cid); + this.emit('senderDisconnect', { cid: packet.cid, - universe: packet.universe, + universe, lastPacket: packet, }); } @@ -115,41 +186,95 @@ export class ReceiverMerge extends Receiver { } } - // HTP + return { + universe, + maximumPriority, + universeData, + }; + } + + private handleChanges( + data: MergingReceiver.PreparedData, + mergedData: Payload, + ): void { + const { referenceData } = data.universeData; + + // only changes + let changesDetected = false; + for (let ch = 1; ch <= 512; ch += 1) { + if (referenceData[ch] !== mergedData[ch]) { + changesDetected = true; + + const event: MergingReceiver.EventMap['changedValue'] = { + universe: data.universe, + address: ch, + newValue: mergedData[ch]!, + oldValue: referenceData[ch]!, + }; + super.emit('changedValue', event); + } + } + + if (changesDetected) { + this.data.get(data.universe)!.referenceData = mergedData; + + const event: MergingReceiver.EventMap['changed'] = { + universe: data.universe, + payload: mergedData, + }; + super.emit('changed', event); + } + } + + public static HTP(data: MergingReceiver.PreparedData): Payload { const mergedData: Payload = {}; - for (const [, { packet: thisPacket }] of universeData.servers) { + + for (const [, { packet }] of data.universeData.servers) { if ( - thisPacket.priority === maximumPriority && - thisPacket.universe === packet.universe + packet.priority === data.maximumPriority && + packet.universe === data.universe ) { - for (let i = 1; i <= 512; i += 1) { - const newValue = thisPacket.payload[i] || 0; - if ((mergedData[i] ?? 0) < newValue) { - mergedData[i] = newValue; + for (let ch = 1; ch <= 512; ch += 1) { + const newValue = packet.payload[ch] || 0; + if ((mergedData[ch] ?? 0) < newValue) { + mergedData[ch] = newValue; } } } } - // only changes - for (let i = 1; i <= 512; i += 1) { - if (universeData.lastData[i] !== mergedData[i]) { - super.emit('changed', { - universe: packet.universe, - addr: i, - newValue: mergedData[i], - oldValue: universeData.lastData[i], - }); + return mergedData; + } + + /** + * LTP can only operate per-universe, not per-channel. There is no + * situation where LTP-per-channel would be useful. + * + * Therefore, this function just returns the packet with the highest + * priority and the latest timestamp. + */ + public static LTP(data: MergingReceiver.PreparedData): Payload { + let maximumTimestamp = -Infinity; + for (const [, { packet, timestamp }] of data.universeData.servers) { + if ( + packet.priority === data.maximumPriority && + packet.universe === data.universe && + timestamp > maximumTimestamp + ) { + maximumTimestamp = timestamp; } - universeData.lastData[i] = mergedData[i] || 0; } - super.emit('changesDone'); - } - public clearCache() { - // causes every addr value to be emitted - for (const [, univese] of this.data) { - univese.lastData = {}; + for (const [, { packet, timestamp }] of data.universeData.servers) { + if ( + packet.priority === data.maximumPriority && + packet.universe === data.universe && + timestamp === maximumTimestamp + ) { + return packet.payload; + } } + + throw new Error('Internal error'); } } diff --git a/test/merging.test.ts b/test/merging.test.ts new file mode 100644 index 0000000..1b66b37 --- /dev/null +++ b/test/merging.test.ts @@ -0,0 +1,135 @@ +import { type Options, Packet } from '../src/packet'; +import { MergingReceiver } from '../src/merging'; +import type { Payload } from '../src/util'; + +/** helper function to make it easier to create the Map */ +function createPackets( + latestPackets: Record< + string, + Omit & { timestamp?: number } + >, +) { + const latestPacketsMap = new Map(); + for (const server in latestPackets) { + const { timestamp = NaN, ...options } = latestPackets[server]!; + latestPacketsMap.set(server, { + packet: new Packet({ ...options, universe: 14, sequence: NaN }), + timestamp, + }); + } + return latestPacketsMap; +} + +function deleteZeros(payload: Payload) { + for (const ch in payload) { + // eslint-disable-next-line no-param-reassign + if (!payload[ch]) delete payload[ch]; + } +} + +describe('HTP', () => { + it('2 receivers, different priority', () => { + const merged = MergingReceiver.HTP({ + maximumPriority: 124, + universe: 14, + universeData: { + referenceData: [NaN, 1, 2, 3, 4, 5], + servers: createPackets({ + 'main console': { priority: 124, payload: [NaN, 101, 102, 103] }, + 'backup console': { + priority: 123, + payload: [NaN, 201, 202, 203, 204], + }, + }), + }, + }); + expect(merged).toStrictEqual({ + // HTP applies to the whole packet, so channel 4 is 0, instead of 204 + // referenceData is irrelevant for HTP, so channel 5 is 0, instead of 5 + 1: 101, + 2: 102, + 3: 103, + }); + }); + + it('2 receivers, same priority', () => { + const merged = MergingReceiver.HTP({ + maximumPriority: 123, + universe: 14, + universeData: { + referenceData: [NaN, 1, 2, 3, 4, 5], + servers: createPackets({ + 'main console': { priority: 123, payload: [NaN, 101, 102, 103] }, + 'backup console': { + priority: 123, + payload: [NaN, 201, 202, 0, 204], + }, + }), + }, + }); + expect(merged).toStrictEqual({ + 1: 201, + 2: 202, + 3: 103, // main console has a higher value + 4: 204, + }); + }); +}); + +describe('LTP', () => { + it('2 receivers, different priority', () => { + const merged = MergingReceiver.LTP({ + maximumPriority: 124, + universe: 14, + universeData: { + referenceData: [NaN, 1, 2, 3, 4, 5], + servers: createPackets({ + 'main console': { + priority: 124, + payload: [NaN, 101, 102, 103], + timestamp: 1241, + }, + 'backup console': { + priority: 123, + payload: [NaN, 201, 202, 203, 204], + timestamp: 1242, // newer packet but lower pirority + }, + }), + }, + }); + deleteZeros(merged); + + // LTP applies per-universe only, so it just picked the + // packet with the highest priority + // eslint-disable-next-line no-sparse-arrays + expect(merged).toStrictEqual([, 101, 102, 103]); + }); + + it('2 receivers, same priority', () => { + const merged = MergingReceiver.LTP({ + maximumPriority: 123, + universe: 14, + universeData: { + referenceData: [NaN, 1, 2, 3, 4, 5], + servers: createPackets({ + 'main console': { + priority: 123, + payload: [NaN, 101, 102, 103], + timestamp: 1241, + }, + 'backup console': { + priority: 123, + payload: [NaN, 201, 202, 203, 204], + timestamp: 1242, + }, + }), + }, + }); + deleteZeros(merged); + + // LTP applies per-universe only, so it just picked the + // packet with the latest timestamp + // eslint-disable-next-line no-sparse-arrays + expect(merged).toStrictEqual([, 201, 202, 203, 204]); + }); +});