diff --git a/src/data/merging.ts b/src/data/merging.ts deleted file mode 100644 index 8fefa1d..0000000 --- a/src/data/merging.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { DMXUniverse } from './universe'; -import { Packet } from '../packet'; - -export interface SenderData { - readonly cid: string; - readonly priority: number; - readonly universe: number; - readonly data: DMXUniverse; - readonly lastUpdate: number; - readonly sequence: number; -} - -export interface UniverseData { - referenceData: DMXUniverse; - servers: Map; -} - -export interface PreparedData { - universe: number; - maximumPriority: number; - universeData: UniverseData; -} - -export interface ProcessedData extends PreparedData { - mergedData: DMXUniverse; -} - -export interface MergingReceiverChannelChanged { - universe: number; - address: number; - newValue: number; - oldValue: number; -} - -export interface MergingReceiverUniverseChanged { - universe: number; - payload: number[]; -} - -export interface MergingReceiverSenderConnect { - cid: number; - universe: number; - firstPacket: Packet; -} - -export interface MergingReceiverSenderDisconnect { - cid: number; - universe: number; - lastPacket: Packet; -} diff --git a/src/data/universe.ts b/src/data/universe.ts deleted file mode 100644 index da7d87d..0000000 --- a/src/data/universe.ts +++ /dev/null @@ -1,13 +0,0 @@ -export class DMXUniverse { - readonly data: number[] = new Array(512); - - constructor(recordData: Record = {}) { - // init universe using `0` for every channel - this.data.fill(0); - - // set provided values - for (const addr in recordData) { - this.data[+addr - 1] = recordData[+addr] ?? 0; - } - } -} diff --git a/src/index.ts b/src/index.ts index 264e1bc..0831847 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ export * from './packet'; export * from './receiver'; -export * from './receiver/htp'; -export * from './receiver/ltp'; +// eslint-disable-next-line camelcase +export { MergingReceiver as unstable_MergingReceiver } from './merging'; export * from './sender'; diff --git a/src/merging.ts b/src/merging.ts index 7395fca..f5b4fc0 100644 --- a/src/merging.ts +++ b/src/merging.ts @@ -1,27 +1,65 @@ -import { performance } from 'node:perf_hooks'; -import { Packet } from './packet'; +import { performance } from 'perf_hooks'; +import type { Packet } from './packet'; import { Receiver } from './receiver'; import type { Payload } from './util'; -interface Universe { - lastData: Payload; - servers: Map; -} - -interface PacketWithTimestamp { - readonly packet: Packet; - readonly lastTimestamp: number; -} +/** + * @deprecated CAUTION: This feature is experimental, + * and has not been thoroughly tested. It may not behave + * correctly. There is no guarantee that it adheres to + * the E1.33 standard. + */ +export namespace MergingReceiver { + /** See {@link Props.mode here} for docs */ + export type Mode = 'HTP' | 'LTP'; -export namespace ReceiverMerge { export interface Props extends Receiver.Props { + /** + * ### Different priority + * . + * When merging, all senders should normally have a different + * `priority`. Following this rule will prevent most of the + * confusion around merging. + * + * 💡 _Use case: tracking-backup console._ + * + * ### Same priority + * . + * If there are 2 senders with the same `priority`, + * then you need to specify the merging mode: + * + * - `HTP` = **H**ighest **t**akes **P**riority. This means + * that the receiver will use the highest channel value from + * all senders with the same `priority`. If there is a + * malfunction, channels may appear to be stuck, even when + * blacked-out on one console. + * 💡 _Use case: {@link https://youtu.be/vygFW9FDYtM parking a channel} + * or controlling {@link https://en.wiktionary.org/wiki/houselights houselights} + * from a different console._ + * + * - `LTP` = **L**atest **t**akes **P**riority. This means that + * the receiver will use the latest data that it receives from + * the senders with the highest `priority`. **This options is + * not recomended, because a malfunction will cause of lights + * to flicker uncontrollably.** + * 💡 _Use case: none._ + * + * ℹ️ Please refer to the README for more information. + * + * @default 'HTP' + */ + mode?: Mode; timeout?: number; } export interface EventMap extends Receiver.EventMap { changed: { universe: number; - addr: number; + payload: Payload; + }; + changedValue: { + universe: number; + address: number; newValue: number; oldValue: number; }; @@ -37,68 +75,101 @@ export namespace ReceiverMerge { lastPacket: Packet; }; } + + export interface PacketWithTime { + readonly packet: Packet; + readonly timestamp: number; + } + + export interface UniverseData { + referenceData: Payload; + servers: Map; + } + + export interface PreparedData { + universe: number; + maximumPriority: number; + universeData: UniverseData; + } } -export declare interface ReceiverMerge { +export declare interface MergingReceiver { on( type: K, listener: (event: Receiver.EventMap[K]) => void, ): this; } -export class ReceiverMerge extends Receiver { - constructor({ timeout = 5000, ...props }: ReceiverMerge.Props) { +export class MergingReceiver extends Receiver { + private readonly mode: MergingReceiver.Mode; + + private readonly timeout: number; + + private data = new Map(); + + constructor({ + mode = 'HTP', + timeout = 5000, + ...props + }: MergingReceiver.Props) { super(props); - this.timeout = timeout; - super.on('packet', this.mergePacket); - } - readonly timeout: number; + this.mode = mode; + this.timeout = timeout; - protected data = new Map(); + super.on('packet', (packet) => { + const data = this.prepareData(packet); + const mergedData = MergingReceiver[this.mode](data); + this.handleChanges(data, mergedData); + }); + } - public mergePacket(packet: Packet) { - const universe = packet.universe.toString(36); + private prepareData(packet: Packet): MergingReceiver.PreparedData { + const currentTime = performance.now(); + const universe = parseInt(packet.universe.toString(36), 10); const cid = packet.cid.toString(); + // universe is unknown if (!this.data.has(universe)) { this.data.set(universe, { - lastData: [], + referenceData: {}, servers: new Map(), }); + this.emit('newUniverse', { - universe: packet.universe, + universe, firstPacket: packet, }); } const universeData = this.data.get(universe); - if (!universeData) { throw new Error('[sACN] Internal Error: universeData is undefined'); } + // sender is unknown for this universe if (!universeData.servers.has(cid)) { this.emit('senderConnect', { cid: packet.cid, - universe: packet.universe, + universe, firstPacket: packet, }); } - const ts = performance.now(); - + // register current package universeData.servers.set(cid, { packet, - lastTimestamp: ts, + timestamp: currentTime, }); + // check whether sender disconnects setTimeout(() => { - if (universeData.servers.get(cid)?.lastTimestamp === ts) { + if (universeData.servers.get(cid)?.timestamp === currentTime) { universeData.servers.delete(cid); + this.emit('senderDisconnect', { cid: packet.cid, - universe: packet.universe, + universe, lastPacket: packet, }); } @@ -115,41 +186,95 @@ export class ReceiverMerge extends Receiver { } } - // HTP + return { + universe, + maximumPriority, + universeData, + }; + } + + private handleChanges( + data: MergingReceiver.PreparedData, + mergedData: Payload, + ): void { + const { referenceData } = data.universeData; + + // only changes + let changesDetected = false; + for (let ch = 1; ch <= 512; ch += 1) { + if (referenceData[ch] !== mergedData[ch]) { + changesDetected = true; + + const event: MergingReceiver.EventMap['changedValue'] = { + universe: data.universe, + address: ch, + newValue: mergedData[ch]!, + oldValue: referenceData[ch]!, + }; + super.emit('changedValue', event); + } + } + + if (changesDetected) { + this.data.get(data.universe)!.referenceData = mergedData; + + const event: MergingReceiver.EventMap['changed'] = { + universe: data.universe, + payload: mergedData, + }; + super.emit('changed', event); + } + } + + public static HTP(data: MergingReceiver.PreparedData): Payload { const mergedData: Payload = {}; - for (const [, { packet: thisPacket }] of universeData.servers) { + + for (const [, { packet }] of data.universeData.servers) { if ( - thisPacket.priority === maximumPriority && - thisPacket.universe === packet.universe + packet.priority === data.maximumPriority && + packet.universe === data.universe ) { - for (let i = 1; i <= 512; i += 1) { - const newValue = thisPacket.payload[i] || 0; - if ((mergedData[i] ?? 0) < newValue) { - mergedData[i] = newValue; + for (let ch = 1; ch <= 512; ch += 1) { + const newValue = packet.payload[ch] || 0; + if ((mergedData[ch] ?? 0) < newValue) { + mergedData[ch] = newValue; } } } } - // only changes - for (let i = 1; i <= 512; i += 1) { - if (universeData.lastData[i] !== mergedData[i]) { - super.emit('changed', { - universe: packet.universe, - addr: i, - newValue: mergedData[i], - oldValue: universeData.lastData[i], - }); + return mergedData; + } + + /** + * LTP can only operate per-universe, not per-channel. There is no + * situation where LTP-per-channel would be useful. + * + * Therefore, this function just returns the packet with the highest + * priority and the latest timestamp. + */ + public static LTP(data: MergingReceiver.PreparedData): Payload { + let maximumTimestamp = -Infinity; + for (const [, { packet, timestamp }] of data.universeData.servers) { + if ( + packet.priority === data.maximumPriority && + packet.universe === data.universe && + timestamp > maximumTimestamp + ) { + maximumTimestamp = timestamp; } - universeData.lastData[i] = mergedData[i] || 0; } - super.emit('changesDone'); - } - public clearCache() { - // causes every addr value to be emitted - for (const [, univese] of this.data) { - univese.lastData = {}; + for (const [, { packet, timestamp }] of data.universeData.servers) { + if ( + packet.priority === data.maximumPriority && + packet.universe === data.universe && + timestamp === maximumTimestamp + ) { + return packet.payload; + } } + + throw new Error('Internal error'); } } diff --git a/src/receiver/abstract.ts b/src/receiver/abstract.ts deleted file mode 100644 index c4a9f95..0000000 --- a/src/receiver/abstract.ts +++ /dev/null @@ -1,175 +0,0 @@ -import { AssertionError } from 'assert'; -import { Receiver, ReceiverProps } from '../receiver'; -import { DMXUniverse } from '../data/universe'; -import { Packet } from '../packet'; -import { - MergingReceiverChannelChanged, - MergingReceiverSenderConnect, - MergingReceiverSenderDisconnect, - MergingReceiverUniverseChanged, - PreparedData, - ProcessedData, - UniverseData, -} from '../data/merging'; - -export interface MergingReceiverProps extends ReceiverProps { - universes?: number[]; - port?: number; - iface?: string; - reuseAddr?: boolean; - timeout?: number; -} - -export abstract class AbstractMergingReceiver extends Receiver { - protected readonly timeout: number; - - protected data = new Map(); - - constructor({ timeout = 5000, ...props }: MergingReceiverProps) { - super(props); - - this.timeout = timeout; - - super.on('packet', (packet) => { - const data = this.prepareData(packet); - const mergedData = this.mergeData(data); - if (mergedData !== null) { - this.handleChanges({ - ...data, - mergedData, - }); - } - }); - } - - protected prepareData(packet: Packet): PreparedData { - const currentTime = performance.now(); - const universe = parseInt(packet.universe.toString(36), 10); - const cid = packet.cid.toString(); - - // universe is unknown - if (!this.data.has(universe)) { - this.data.set(universe, { - referenceData: new DMXUniverse(), - servers: new Map(), - }); - - this.emit('newUniverse', { - universe, - firstPacket: packet, - }); - } - - const universeData: UniverseData = this.data.get(universe) as UniverseData; - if (!universeData) { - throw new Error('[sACN] Internal Error: universeData is undefined'); - } - - // sender is unknown for this universe - if (!universeData.servers.has(cid)) { - this.emit('senderConnect', { - cid: packet.cid, - universe, - firstPacket: packet, - }); - } - - // register current package - universeData.servers.set(cid, { - cid: packet.cid.toString(), - priority: packet.priority, - universe, - data: new DMXUniverse(packet.payload), - lastUpdate: currentTime, - sequence: packet.sequence, - }); - - // check whether sender disconnects - setTimeout(() => { - if (universeData.servers.get(cid)?.lastUpdate === currentTime) { - universeData.servers.delete(cid); - - this.emit('senderDisconnect', { - cid: packet.cid, - universe, - lastPacket: packet, - }); - } - }, this.timeout); - - // detect which source has the highest per-universe priority - let maximumPriority = 0; - for (const [, data] of universeData.servers) { - if ( - data.priority > maximumPriority && - data.universe === packet.universe - ) { - maximumPriority = data.priority; - } - } - - return { - universe, - maximumPriority, - universeData, - }; - } - - protected abstract mergeData(packet: PreparedData): DMXUniverse | null; - - protected handleChanges(data: ProcessedData): void { - const { referenceData } = data.universeData; - const { mergedData } = data; - - // only changes - let changesDetected = false; - for (let ch = 0; ch < 512; ch += 1) { - if (referenceData.data[ch] !== mergedData.data[ch]) { - changesDetected = true; - - super.emit('changedValue', { - universe: data.universe, - address: ch + 1, - newValue: mergedData.data[ch], - oldValue: referenceData.data[ch], - } as MergingReceiverChannelChanged); - } - } - - if (changesDetected) { - this.data.get(data.universe)!.referenceData = mergedData; - - super.emit('changed', { - universe: data.universe, - payload: mergedData.data, - } as MergingReceiverUniverseChanged); - } - } -} - -export declare interface AbstractMergingReceiver { - on( - event: Parameters[0], - listener: Parameters[1], - ): this; - on( - event: 'changedValue', - listener: (ev: MergingReceiverChannelChanged) => void, - ): this; - on( - event: 'changed', - listener: (ev: MergingReceiverUniverseChanged) => void, - ): this; - on( - event: 'senderConnect', - listener: (ev: MergingReceiverSenderConnect) => void, - ): this; - on( - event: 'senderDisconnect', - listener: (ev: MergingReceiverSenderDisconnect) => void, - ): this; - on(event: 'packet', listener: (packet: Packet) => void): this; - on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this; - on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this; - on(event: 'error', listener: (err: Error) => void): this; -} diff --git a/src/receiver/htp.ts b/src/receiver/htp.ts deleted file mode 100644 index 84b527b..0000000 --- a/src/receiver/htp.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { AbstractMergingReceiver } from './abstract'; -import { PreparedData } from '../data/merging'; -import { DMXUniverse } from '../data/universe'; - -export class HTPMergingReceiver extends AbstractMergingReceiver { - // eslint-disable-next-line class-methods-use-this - protected mergeData(data: PreparedData): DMXUniverse | null { - const mergedData = new DMXUniverse(); - - for (const [, tmp] of data.universeData.servers) { - if ( - tmp.priority === data.maximumPriority && - tmp.universe === data.universe - ) { - for (let ch = 0; ch < 512; ch += 1) { - const newValue = tmp.data.data[ch] || 0; - if ((mergedData.data[ch] ?? 0) < newValue) { - mergedData.data[ch] = newValue; - } - } - } - } - - return mergedData; - } -} diff --git a/src/receiver/ltp.ts b/src/receiver/ltp.ts deleted file mode 100644 index 86e8eec..0000000 --- a/src/receiver/ltp.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { DMXUniverse } from '../data/universe'; -import { PreparedData } from '../data/merging'; -import { AbstractMergingReceiver } from './abstract'; - -export class LTPMergingReceiver extends AbstractMergingReceiver { - // eslint-disable-next-line class-methods-use-this - protected mergeData(data: PreparedData): DMXUniverse | null { - const { referenceData } = data.universeData; - const mergedData = new DMXUniverse(); - - for (let ch = 0; ch < 512; ch += 1) { - let referenceTime = 0; - mergedData.data[ch] = referenceData.data[ch] || 0; - - for (const [, tmp] of data.universeData.servers) { - if (tmp.lastUpdate > referenceTime) { - referenceTime = tmp.lastUpdate; - mergedData.data[ch] = - tmp.data.data[ch] || (mergedData.data[ch] as number); - } - } - } - - return mergedData; - } -} diff --git a/test/merging.test.ts b/test/merging.test.ts new file mode 100644 index 0000000..1b66b37 --- /dev/null +++ b/test/merging.test.ts @@ -0,0 +1,135 @@ +import { type Options, Packet } from '../src/packet'; +import { MergingReceiver } from '../src/merging'; +import type { Payload } from '../src/util'; + +/** helper function to make it easier to create the Map */ +function createPackets( + latestPackets: Record< + string, + Omit & { timestamp?: number } + >, +) { + const latestPacketsMap = new Map(); + for (const server in latestPackets) { + const { timestamp = NaN, ...options } = latestPackets[server]!; + latestPacketsMap.set(server, { + packet: new Packet({ ...options, universe: 14, sequence: NaN }), + timestamp, + }); + } + return latestPacketsMap; +} + +function deleteZeros(payload: Payload) { + for (const ch in payload) { + // eslint-disable-next-line no-param-reassign + if (!payload[ch]) delete payload[ch]; + } +} + +describe('HTP', () => { + it('2 receivers, different priority', () => { + const merged = MergingReceiver.HTP({ + maximumPriority: 124, + universe: 14, + universeData: { + referenceData: [NaN, 1, 2, 3, 4, 5], + servers: createPackets({ + 'main console': { priority: 124, payload: [NaN, 101, 102, 103] }, + 'backup console': { + priority: 123, + payload: [NaN, 201, 202, 203, 204], + }, + }), + }, + }); + expect(merged).toStrictEqual({ + // HTP applies to the whole packet, so channel 4 is 0, instead of 204 + // referenceData is irrelevant for HTP, so channel 5 is 0, instead of 5 + 1: 101, + 2: 102, + 3: 103, + }); + }); + + it('2 receivers, same priority', () => { + const merged = MergingReceiver.HTP({ + maximumPriority: 123, + universe: 14, + universeData: { + referenceData: [NaN, 1, 2, 3, 4, 5], + servers: createPackets({ + 'main console': { priority: 123, payload: [NaN, 101, 102, 103] }, + 'backup console': { + priority: 123, + payload: [NaN, 201, 202, 0, 204], + }, + }), + }, + }); + expect(merged).toStrictEqual({ + 1: 201, + 2: 202, + 3: 103, // main console has a higher value + 4: 204, + }); + }); +}); + +describe('LTP', () => { + it('2 receivers, different priority', () => { + const merged = MergingReceiver.LTP({ + maximumPriority: 124, + universe: 14, + universeData: { + referenceData: [NaN, 1, 2, 3, 4, 5], + servers: createPackets({ + 'main console': { + priority: 124, + payload: [NaN, 101, 102, 103], + timestamp: 1241, + }, + 'backup console': { + priority: 123, + payload: [NaN, 201, 202, 203, 204], + timestamp: 1242, // newer packet but lower pirority + }, + }), + }, + }); + deleteZeros(merged); + + // LTP applies per-universe only, so it just picked the + // packet with the highest priority + // eslint-disable-next-line no-sparse-arrays + expect(merged).toStrictEqual([, 101, 102, 103]); + }); + + it('2 receivers, same priority', () => { + const merged = MergingReceiver.LTP({ + maximumPriority: 123, + universe: 14, + universeData: { + referenceData: [NaN, 1, 2, 3, 4, 5], + servers: createPackets({ + 'main console': { + priority: 123, + payload: [NaN, 101, 102, 103], + timestamp: 1241, + }, + 'backup console': { + priority: 123, + payload: [NaN, 201, 202, 203, 204], + timestamp: 1242, + }, + }), + }, + }); + deleteZeros(merged); + + // LTP applies per-universe only, so it just picked the + // packet with the latest timestamp + // eslint-disable-next-line no-sparse-arrays + expect(merged).toStrictEqual([, 201, 202, 203, 204]); + }); +});