From b494a9b82c38431d33104fbc4044e393a5167150 Mon Sep 17 00:00:00 2001 From: Hans Schallmoser <99032404+hansSchall@users.noreply.github.com> Date: Mon, 11 Mar 2024 10:25:32 +0100 Subject: [PATCH] Support merging (#43) --- src/receiver/abstract.ts | 167 +++++++++++++++++++++++++++++++++++++++ test/logData.ts | 12 +++ 2 files changed, 179 insertions(+) create mode 100644 src/receiver/abstract.ts create mode 100644 test/logData.ts diff --git a/src/receiver/abstract.ts b/src/receiver/abstract.ts new file mode 100644 index 0000000..2eb84eb --- /dev/null +++ b/src/receiver/abstract.ts @@ -0,0 +1,167 @@ +import { AssertionError } from 'assert'; +import { Packet } from '../packet'; +import { Receiver } from '../receiver'; +import type { Payload } from '../util'; + +interface MergeProps { + universes?: number[]; + port?: number; + iface?: string; + reuseAddr?: boolean; + timeout?: number; +} + +interface Universe { + lastData: Payload; + servers: Map; +} + +interface PacketWithTimestamp { + readonly packet: Packet; + readonly lastTimestamp: number; +} + +export class ReceiverMerge extends Receiver { + constructor({ timeout = 5000, ...props }: MergeProps) { + super(props); + this.timeout = timeout; + super.on('packet', this.mergePacket); + } + + readonly timeout: number; + + protected data = new Map(); + + public mergePacket(packet: Packet) { + const universe = packet.universe.toString(36); + const cid = packet.cid.toString(); + + if (!this.data.has(universe)) { + this.data.set(universe, { + lastData: [], + servers: new Map(), + }); + this.emit('newUniverse', { + universe: packet.universe, + firstPacket: packet, + }); + } + + const universeData = this.data.get(universe); + + if (!universeData) { + throw new Error('[sACN] Internal Error: universeData is undefined'); + } + + if (!universeData.servers.has(cid)) { + this.emit('senderConnect', { + cid: packet.cid, + universe: packet.universe, + firstPacket: packet, + }); + } + + const ts = performance.now(); + + universeData.servers.set(cid, { + packet, + lastTimestamp: ts, + }); + + setTimeout(() => { + if (universeData.servers.get(cid)?.lastTimestamp === ts) { + universeData.servers.delete(cid); + this.emit('senderDisconnect', { + cid: packet.cid, + universe: packet.universe, + lastPacket: packet, + }); + } + }, this.timeout); + + // detect which source has the highest per-universe priority + let maximumPriority = 0; + for (const [, { packet: thisPacket }] of universeData.servers) { + if ( + thisPacket.priority > maximumPriority && + thisPacket.universe === packet.universe + ) { + maximumPriority = thisPacket.priority; + } + } + + // HTP + const mergedData: Payload = {}; + for (const [, { packet: thisPacket }] of universeData.servers) { + if ( + thisPacket.priority === maximumPriority && + thisPacket.universe === packet.universe + ) { + for (let i = 1; i <= 512; i += 1) { + const newValue = thisPacket.payload[i] || 0; + if ((mergedData[i] ?? 0) < newValue) { + mergedData[i] = newValue; + } + } + } + } + + // only changes + for (let i = 1; i <= 512; i += 1) { + if (universeData.lastData[i] !== mergedData[i]) { + super.emit('changed', { + universe: packet.universe, + addr: i, + newValue: mergedData[i], + oldValue: universeData.lastData[i], + }); + } + universeData.lastData[i] = mergedData[i] || 0; + } + super.emit('changesDone'); + } + + public clearCache() { + // causes every addr value to be emitted + for (const [, univese] of this.data) { + univese.lastData = {}; + } + } +} +export declare interface ReceiverMerge { + // on(event: string, listener: (...args: any[]) => void): this; + on( + event: Parameters[0], + listener: Parameters[1], + ): this; + on( + event: 'changed', + listener: (ev: { + universe: number; + addr: number; + newValue: number; + oldValue: number; + }) => void, + ): this; + on(event: 'changesDone', listener: () => void): this; + on( + event: 'senderConnect', + listener: (ev: { + cid: number; + universe: number; + firstPacket: Packet; + }) => void, + ): this; + on( + event: 'senderDisconnect', + listener: (ev: { + cid: number; + universe: number; + lastPacket: Packet; + }) => void, + ): this; + on(event: 'packet', listener: (packet: Packet) => void): this; + on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this; + on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this; + on(event: 'error', listener: (err: Error) => void): this; +} diff --git a/test/logData.ts b/test/logData.ts new file mode 100644 index 0000000..3453191 --- /dev/null +++ b/test/logData.ts @@ -0,0 +1,12 @@ +import { ReceiverMerge } from '../src/index'; + +export async function main() { + const sacn = new ReceiverMerge({ + universes: [1, 2], + reuseAddr: true, + }); + sacn.on('changed', (ev) => { + console.log(ev.universe, ev.addr, Math.round(ev.newValue * 2.55)); + }); +} +main();