From 0c19b67d084eb87aed641dcb0224f1d9ccedf03f Mon Sep 17 00:00:00 2001 From: hansSchall <99032404+hansSchall@users.noreply.github.com> Date: Wed, 6 Apr 2022 18:12:00 +0200 Subject: [PATCH 01/15] add merge.ts fixes: #2 --- src/merge.ts | 102 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 src/merge.ts diff --git a/src/merge.ts b/src/merge.ts new file mode 100644 index 0000000..435c35f --- /dev/null +++ b/src/merge.ts @@ -0,0 +1,102 @@ +import { Packet, Receiver } from "./index"; + +interface MergeProps { + universes?: number[]; + port?: number; + iface?: string; + reuseAddr?: boolean; + timeout?: number; +} + +export class ReceiverMerge extends Receiver { + constructor({ + universes = [1], + port = 5568, + iface = undefined, + reuseAddr = false, + timeout = 5000, + }: MergeProps) { + super({ + universes, + port, + iface, + reuseAddr + }); + this.timeout = timeout; + super.on("packet", this.mergePacket); + } + readonly timeout: number; + protected senders = new Map(); + protected lastData = new sACNData(); + mergePacket(packet: Packet) { + // used to identify each source (cid & universe) + let pid: string = packet.cid.toString() + "#" + packet.universe.toString(); + this.senders.set( + pid, + new SendersData( + packet.cid.toString(), + new sACNData(packet.payload), + packet.priority, + packet.sequence + )); + setTimeout(() => { + if (this.senders.get(pid)?.seq == packet.sequence) this.senders.delete(pid); + }, this.timeout); + + // detect which source has the highest per-universe priority + let maximumPrio = 0; + for (let [_, data] of this.senders) { + if (data.prio > maximumPrio) { + maximumPrio = data.prio; + } + } + + // HTP + let mergedData = new sACNData(); + for (let [_, data] of this.senders) { + if (data.prio == maximumPrio) { + let i = 0; + while (i < 512) { + let newValue = data.data.data[i]; + if (mergedData.data[i] < newValue) mergedData.data[i] = newValue; + i++; + } + } + } + + // console.log(mergedData); + // only changes + let i = 0; + while (i < 512) { + if (this.lastData.data[i] != mergedData.data[i]) { + super.emit("changed", { + universe: packet.universe, + addr: i + 1, + newValue: mergedData.data[i], + oldValue: this.lastData.data[i] + }) + } + this.lastData.data[i] = mergedData.data[i] + i++; + } + } +} +class SendersData { + constructor( + readonly cid: string, + readonly data: sACNData, + readonly prio: number, + readonly seq: number + ) { + + } +} +export class sACNData { + data: number[] = new Array(512); + constructor(recordData: Record = {}) { + this.data.fill(0); + for (let addr in recordData) { + this.data[+addr - 1] = recordData[+addr]; + } + } +} From d186e41237f11854deb5caf85bc7b8d0c35b29df Mon Sep 17 00:00:00 2001 From: hansSchall <99032404+hansSchall@users.noreply.github.com> Date: Wed, 6 Apr 2022 18:13:25 +0200 Subject: [PATCH 02/15] add exports for merge.ts --- src/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/index.ts b/src/index.ts index 4e7b8f5..06232f4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,4 @@ export * from './packet'; export * from './receiver'; export * from './sender'; +export * from './merge'; From d09c50f7ddf177b051e2df31bf6f9e149ac7c54c Mon Sep 17 00:00:00 2001 From: hansSchall <99032404+hansSchall@users.noreply.github.com> Date: Fri, 8 Apr 2022 12:40:11 +0200 Subject: [PATCH 03/15] Update src/merge.ts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit add sender connect event Co-authored-by: Kyℓe Hensel --- src/merge.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/merge.ts b/src/merge.ts index 435c35f..2c18111 100644 --- a/src/merge.ts +++ b/src/merge.ts @@ -31,6 +31,7 @@ export class ReceiverMerge extends Receiver { mergePacket(packet: Packet) { // used to identify each source (cid & universe) let pid: string = packet.cid.toString() + "#" + packet.universe.toString(); + if (!this.senders.has(pid)) this.emit('senderConnect', packet); this.senders.set( pid, new SendersData( From 3011c11d8dc21002a39c641987e4cbbb6b9c503c Mon Sep 17 00:00:00 2001 From: hansSchall <99032404+hansSchall@users.noreply.github.com> Date: Fri, 8 Apr 2022 12:40:56 +0200 Subject: [PATCH 04/15] Update src/merge.ts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit clean up constructor Co-authored-by: Kyℓe Hensel --- src/merge.ts | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/src/merge.ts b/src/merge.ts index 2c18111..7b09530 100644 --- a/src/merge.ts +++ b/src/merge.ts @@ -9,19 +9,8 @@ interface MergeProps { } export class ReceiverMerge extends Receiver { - constructor({ - universes = [1], - port = 5568, - iface = undefined, - reuseAddr = false, - timeout = 5000, - }: MergeProps) { - super({ - universes, - port, - iface, - reuseAddr - }); + constructor({ timeout = 5000, ...props }: MergeProps) { + super(props); this.timeout = timeout; super.on("packet", this.mergePacket); } From c6c13aec68cee486a9c44914eb07d0472ec1bd7a Mon Sep 17 00:00:00 2001 From: hansSchall <99032404+hansSchall@users.noreply.github.com> Date: Sat, 9 Apr 2022 17:47:53 +0200 Subject: [PATCH 05/15] add senderDisconnect event --- src/merge.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/merge.ts b/src/merge.ts index 7b09530..764809e 100644 --- a/src/merge.ts +++ b/src/merge.ts @@ -30,7 +30,10 @@ export class ReceiverMerge extends Receiver { packet.sequence )); setTimeout(() => { - if (this.senders.get(pid)?.seq == packet.sequence) this.senders.delete(pid); + if (this.senders.get(pid)?.seq == packet.sequence) { + this.senders.delete(pid); + this.emit('senderDisonnect', packet.cid); + }; }, this.timeout); // detect which source has the highest per-universe priority From ccdc33a704761ab9acd77fc1323916c464308511 Mon Sep 17 00:00:00 2001 From: Hans Schallmoser Date: Tue, 19 Apr 2022 12:48:34 +0200 Subject: [PATCH 06/15] fix ts-strict problems add default value 0 --- src/merge.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/merge.ts b/src/merge.ts index 764809e..d0db607 100644 --- a/src/merge.ts +++ b/src/merge.ts @@ -50,8 +50,9 @@ export class ReceiverMerge extends Receiver { if (data.prio == maximumPrio) { let i = 0; while (i < 512) { - let newValue = data.data.data[i]; - if (mergedData.data[i] < newValue) mergedData.data[i] = newValue; + let newValue = data.data.data[i] || 0; + //if(mergedData is not defined) do nothing + if (mergedData.data[i] || Infinity < newValue) mergedData.data[i] = newValue; i++; } } @@ -69,7 +70,7 @@ export class ReceiverMerge extends Receiver { oldValue: this.lastData.data[i] }) } - this.lastData.data[i] = mergedData.data[i] + this.lastData.data[i] = mergedData.data[i] || 0; i++; } } @@ -89,7 +90,7 @@ export class sACNData { constructor(recordData: Record = {}) { this.data.fill(0); for (let addr in recordData) { - this.data[+addr - 1] = recordData[+addr]; + this.data[+addr - 1] = recordData[+addr] || 0; } } } From 34c6f9489490e62b72fcae32fc42ddcf2760c4cb Mon Sep 17 00:00:00 2001 From: Hans Schallmoser Date: Tue, 19 Apr 2022 12:53:36 +0200 Subject: [PATCH 07/15] more detailed event objects - more detailed senderConnect - more detailed senderDisconnect - convert ServersData to interface --- src/merge.ts | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/src/merge.ts b/src/merge.ts index d0db607..1ce8e3d 100644 --- a/src/merge.ts +++ b/src/merge.ts @@ -20,19 +20,28 @@ export class ReceiverMerge extends Receiver { mergePacket(packet: Packet) { // used to identify each source (cid & universe) let pid: string = packet.cid.toString() + "#" + packet.universe.toString(); - if (!this.senders.has(pid)) this.emit('senderConnect', packet); + if (!this.senders.has(pid)) this.emit('senderConnect', { + cid: packet.cid, + universe: packet.universe, + firstPacket: packet + }); this.senders.set( pid, - new SendersData( - packet.cid.toString(), - new sACNData(packet.payload), - packet.priority, - packet.sequence - )); + { + cid: packet.cid.toString(), + data: new sACNData(packet.payload), + prio: packet.priority, + seq: packet.sequence + }); setTimeout(() => { if (this.senders.get(pid)?.seq == packet.sequence) { this.senders.delete(pid); - this.emit('senderDisonnect', packet.cid); + // `packet` is the last packet the source sent + this.emit('senderDisonnect', { + cid: packet.cid, + universe: packet.universe, + lastPacket: packet + }); }; }, this.timeout); @@ -75,15 +84,11 @@ export class ReceiverMerge extends Receiver { } } } -class SendersData { - constructor( - readonly cid: string, - readonly data: sACNData, - readonly prio: number, - readonly seq: number - ) { - - } +interface SendersData { + readonly cid: string, + readonly data: sACNData, + readonly prio: number, + readonly seq: number } export class sACNData { data: number[] = new Array(512); From f401f48473beb95847e9bfa107ab0861baad1548 Mon Sep 17 00:00:00 2001 From: Hans Schallmoser Date: Tue, 19 Apr 2022 17:26:21 +0200 Subject: [PATCH 08/15] update sender-id format - converted from string to [cid, universe] - renamed from pid to sid (sender-id) --- src/merge.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/merge.ts b/src/merge.ts index 1ce8e3d..c53b68b 100644 --- a/src/merge.ts +++ b/src/merge.ts @@ -7,6 +7,7 @@ interface MergeProps { reuseAddr?: boolean; timeout?: number; } +type Sid = [string, number] export class ReceiverMerge extends Receiver { constructor({ timeout = 5000, ...props }: MergeProps) { @@ -15,18 +16,18 @@ export class ReceiverMerge extends Receiver { super.on("packet", this.mergePacket); } readonly timeout: number; - protected senders = new Map(); + protected senders = new Map(); protected lastData = new sACNData(); mergePacket(packet: Packet) { // used to identify each source (cid & universe) - let pid: string = packet.cid.toString() + "#" + packet.universe.toString(); - if (!this.senders.has(pid)) this.emit('senderConnect', { + let sid: Sid = [packet.cid.toString(), packet.universe]; + if (!this.senders.has(sid)) this.emit('senderConnect', { cid: packet.cid, universe: packet.universe, firstPacket: packet }); this.senders.set( - pid, + sid, { cid: packet.cid.toString(), data: new sACNData(packet.payload), @@ -34,8 +35,8 @@ export class ReceiverMerge extends Receiver { seq: packet.sequence }); setTimeout(() => { - if (this.senders.get(pid)?.seq == packet.sequence) { - this.senders.delete(pid); + if (this.senders.get(sid)?.seq == packet.sequence) { + this.senders.delete(sid); // `packet` is the last packet the source sent this.emit('senderDisonnect', { cid: packet.cid, @@ -83,6 +84,9 @@ export class ReceiverMerge extends Receiver { i++; } } + getSenders() { + return [...this.senders.keys()].map(([cid, universe]) => ({ cid, universe })); + } } interface SendersData { readonly cid: string, From 2343cc68606d116c063b6e8208d974cca766329e Mon Sep 17 00:00:00 2001 From: Hans Schallmoser Date: Mon, 16 May 2022 19:13:54 +0200 Subject: [PATCH 09/15] fix multiple bugs - fix multiple universes bug (add universe equality constraint in merge core) - fix sender-id bugs (identify senders by a string instead of a [string,number] - add .on() typescript call signatures - add .clearCache() - add .on("changesDone") - fix spelling 'senderDis*C*onnect --- src/merge.ts | 56 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/src/merge.ts b/src/merge.ts index c53b68b..2b83830 100644 --- a/src/merge.ts +++ b/src/merge.ts @@ -1,4 +1,5 @@ import { Packet, Receiver } from "./index"; +import { AssertionError } from "assert"; interface MergeProps { universes?: number[]; @@ -7,7 +8,6 @@ interface MergeProps { reuseAddr?: boolean; timeout?: number; } -type Sid = [string, number] export class ReceiverMerge extends Receiver { constructor({ timeout = 5000, ...props }: MergeProps) { @@ -16,11 +16,12 @@ export class ReceiverMerge extends Receiver { super.on("packet", this.mergePacket); } readonly timeout: number; - protected senders = new Map(); + protected senders = new Map(); protected lastData = new sACNData(); mergePacket(packet: Packet) { // used to identify each source (cid & universe) - let sid: Sid = [packet.cid.toString(), packet.universe]; + let sid = packet.universe.toString(36) + " " + packet.cid.toString(); + // console.log(sid); if (!this.senders.has(sid)) this.emit('senderConnect', { cid: packet.cid, universe: packet.universe, @@ -32,13 +33,14 @@ export class ReceiverMerge extends Receiver { cid: packet.cid.toString(), data: new sACNData(packet.payload), prio: packet.priority, - seq: packet.sequence + seq: packet.sequence, + universe: packet.universe, }); setTimeout(() => { if (this.senders.get(sid)?.seq == packet.sequence) { this.senders.delete(sid); // `packet` is the last packet the source sent - this.emit('senderDisonnect', { + this.emit('senderDisconnect', { cid: packet.cid, universe: packet.universe, lastPacket: packet @@ -49,7 +51,7 @@ export class ReceiverMerge extends Receiver { // detect which source has the highest per-universe priority let maximumPrio = 0; for (let [_, data] of this.senders) { - if (data.prio > maximumPrio) { + if (data.prio > maximumPrio && data.universe == packet.universe) { maximumPrio = data.prio; } } @@ -57,12 +59,11 @@ export class ReceiverMerge extends Receiver { // HTP let mergedData = new sACNData(); for (let [_, data] of this.senders) { - if (data.prio == maximumPrio) { + if (data.prio == maximumPrio && data.universe == packet.universe) { let i = 0; while (i < 512) { let newValue = data.data.data[i] || 0; - //if(mergedData is not defined) do nothing - if (mergedData.data[i] || Infinity < newValue) mergedData.data[i] = newValue; + if ((mergedData.data[i] ?? 0) < newValue) mergedData.data[i] = newValue; i++; } } @@ -83,23 +84,54 @@ export class ReceiverMerge extends Receiver { this.lastData.data[i] = mergedData.data[i] || 0; i++; } + super.emit("changesDone"); + } + clearCache() { + // causes every addr value to be emitted + this.lastData = new sACNData(); } getSenders() { return [...this.senders.keys()].map(([cid, universe]) => ({ cid, universe })); } } +export declare interface ReceiverMerge { + on(event: string, listener: (...args: any[]) => void): this; + on(event: Parameters[0], listener: Parameters[1]): this; + on(event: "changed", listener: (ev: { + universe: number, + addr: number, + newValue: number, + oldValue: number + }) => void): this; + on(event: "changesDone", listener: () => void): this; + on(event: "senderConnect", listener: (ev: { + cid: number, + universe: number, + firstPacket: Packet + }) => void): this; + on(event: "senderDisconnect", listener: (ev: { + cid: number, + universe: number, + lastPacket: Packet + }) => void): this; + on(event: 'packet', listener: (packet: Packet) => void): this; + on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this; + on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this; + on(event: 'error', listener: (err: Error) => void): this; +} interface SendersData { readonly cid: string, readonly data: sACNData, readonly prio: number, - readonly seq: number + readonly seq: number, + readonly universe: number, } export class sACNData { data: number[] = new Array(512); constructor(recordData: Record = {}) { this.data.fill(0); for (let addr in recordData) { - this.data[+addr - 1] = recordData[+addr] || 0; + this.data[+addr - 1] = recordData[+addr] ?? 0; } } -} +} \ No newline at end of file From c733e33138e69a0727471d3d6275faec7577f1d1 Mon Sep 17 00:00:00 2001 From: Hans Schallmoser <99032404+hansSchall@users.noreply.github.com> Date: Mon, 10 Oct 2022 19:50:34 +0200 Subject: [PATCH 10/15] multiple improvements - improved multi-universe management - improved timeout management - improved sender/universe storage --- src/merge.ts | 97 ++++++++++++++++++++++++++++++++----------------- test/logData.ts | 12 ++++++ 2 files changed, 75 insertions(+), 34 deletions(-) create mode 100644 test/logData.ts diff --git a/src/merge.ts b/src/merge.ts index 2b83830..91334b9 100644 --- a/src/merge.ts +++ b/src/merge.ts @@ -9,37 +9,75 @@ interface MergeProps { timeout?: number; } +interface Universe { + lastData: sACNData, + servers: Map +} + +interface SendersData { + readonly cid: string, + readonly data: sACNData, + readonly prio: number, + readonly seq: number, + readonly universe: number, + readonly lastTimestamp: number, +} + export class ReceiverMerge extends Receiver { constructor({ timeout = 5000, ...props }: MergeProps) { super(props); this.timeout = timeout; super.on("packet", this.mergePacket); } + readonly timeout: number; - protected senders = new Map(); - protected lastData = new sACNData(); + + protected data = new Map(); + mergePacket(packet: Packet) { - // used to identify each source (cid & universe) - let sid = packet.universe.toString(36) + " " + packet.cid.toString(); - // console.log(sid); - if (!this.senders.has(sid)) this.emit('senderConnect', { - cid: packet.cid, - universe: packet.universe, - firstPacket: packet - }); - this.senders.set( - sid, + const universe = packet.universe.toString(36); + const cid = packet.cid.toString(); + + if (!this.data.has(universe)) { + this.data.set(universe, { + lastData: new sACNData(), + servers: new Map(), + }); + this.emit('newUniverse', { + universe: packet.universe, + firstPacket: packet, + }); + } + + const universeData: Universe = this.data.get(universe) as Universe; + + if (!universeData) + throw new Error("[sACN] Internal Error: universeData is undefined") + + if (!universeData.servers.has(cid)) { + this.emit('senderConnect', { + cid: packet.cid, + universe: packet.universe, + firstPacket: packet, + }); + } + + const ts = performance.now(); + + universeData.servers.set(cid, { cid: packet.cid.toString(), data: new sACNData(packet.payload), prio: packet.priority, seq: packet.sequence, universe: packet.universe, - }); + lastTimestamp: ts, + } + ); + setTimeout(() => { - if (this.senders.get(sid)?.seq == packet.sequence) { - this.senders.delete(sid); - // `packet` is the last packet the source sent + if (universeData.servers.get(cid)?.lastTimestamp == ts) { + universeData.servers.delete(cid) this.emit('senderDisconnect', { cid: packet.cid, universe: packet.universe, @@ -50,7 +88,7 @@ export class ReceiverMerge extends Receiver { // detect which source has the highest per-universe priority let maximumPrio = 0; - for (let [_, data] of this.senders) { + for (let [_, data] of universeData.servers) { if (data.prio > maximumPrio && data.universe == packet.universe) { maximumPrio = data.prio; } @@ -58,7 +96,7 @@ export class ReceiverMerge extends Receiver { // HTP let mergedData = new sACNData(); - for (let [_, data] of this.senders) { + for (let [_, data] of universeData.servers) { if (data.prio == maximumPrio && data.universe == packet.universe) { let i = 0; while (i < 512) { @@ -69,29 +107,27 @@ export class ReceiverMerge extends Receiver { } } - // console.log(mergedData); // only changes let i = 0; while (i < 512) { - if (this.lastData.data[i] != mergedData.data[i]) { + if (universeData.lastData.data[i] != mergedData.data[i]) { super.emit("changed", { universe: packet.universe, addr: i + 1, newValue: mergedData.data[i], - oldValue: this.lastData.data[i] + oldValue: universeData.lastData.data[i] }) } - this.lastData.data[i] = mergedData.data[i] || 0; + universeData.lastData.data[i] = mergedData.data[i] || 0; i++; } super.emit("changesDone"); } clearCache() { // causes every addr value to be emitted - this.lastData = new sACNData(); - } - getSenders() { - return [...this.senders.keys()].map(([cid, universe]) => ({ cid, universe })); + for (let [, univese] of this.data) { + univese.lastData = new sACNData(); + } } } export declare interface ReceiverMerge { @@ -119,13 +155,6 @@ export declare interface ReceiverMerge { on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this; on(event: 'error', listener: (err: Error) => void): this; } -interface SendersData { - readonly cid: string, - readonly data: sACNData, - readonly prio: number, - readonly seq: number, - readonly universe: number, -} export class sACNData { data: number[] = new Array(512); constructor(recordData: Record = {}) { @@ -134,4 +163,4 @@ export class sACNData { this.data[+addr - 1] = recordData[+addr] ?? 0; } } -} \ No newline at end of file +} diff --git a/test/logData.ts b/test/logData.ts new file mode 100644 index 0000000..679e03d --- /dev/null +++ b/test/logData.ts @@ -0,0 +1,12 @@ +import { ReceiverMerge } from "../src/index"; + +export async function main() { + const sacn = new ReceiverMerge({ + universes: [1, 2], + reuseAddr: true, + }) + sacn.on("changed", (ev) => { + console.log(ev.universe, ev.addr, Math.round(ev.newValue * 2.55)); + }) +} +main(); From 8405cd494a5018e72ecda6700627b785dac89071 Mon Sep 17 00:00:00 2001 From: Hans Schallmoser <99032404+hansSchall@users.noreply.github.com> Date: Thu, 9 Feb 2023 09:15:59 +0100 Subject: [PATCH 11/15] fix all eslint problems --- src/merge.ts | 289 +++++++++++++++++++++++++----------------------- src/sACNData.ts | 10 ++ test/logData.ts | 16 +-- 3 files changed, 166 insertions(+), 149 deletions(-) create mode 100644 src/sACNData.ts diff --git a/src/merge.ts b/src/merge.ts index 91334b9..70fdef3 100644 --- a/src/merge.ts +++ b/src/merge.ts @@ -1,166 +1,173 @@ -import { Packet, Receiver } from "./index"; -import { AssertionError } from "assert"; +import { AssertionError } from 'assert'; +import { Packet } from './packet'; +import { Receiver } from './receiver'; +import { SACNData } from './sACNData'; interface MergeProps { - universes?: number[]; - port?: number; - iface?: string; - reuseAddr?: boolean; - timeout?: number; + universes?: number[]; + port?: number; + iface?: string; + reuseAddr?: boolean; + timeout?: number; } interface Universe { - lastData: sACNData, - servers: Map + lastData: SACNData; + servers: Map; } interface SendersData { - readonly cid: string, - readonly data: sACNData, - readonly prio: number, - readonly seq: number, - readonly universe: number, - readonly lastTimestamp: number, + readonly cid: string; + readonly data: SACNData; + readonly prio: number; + readonly seq: number; + readonly universe: number; + readonly lastTimestamp: number; } export class ReceiverMerge extends Receiver { - constructor({ timeout = 5000, ...props }: MergeProps) { - super(props); - this.timeout = timeout; - super.on("packet", this.mergePacket); + constructor({ timeout = 5000, ...props }: MergeProps) { + super(props); + this.timeout = timeout; + super.on('packet', this.mergePacket); + } + + readonly timeout: number; + + protected data = new Map(); + + mergePacket(packet: Packet) { + const universe = packet.universe.toString(36); + const cid = packet.cid.toString(); + + if (!this.data.has(universe)) { + this.data.set(universe, { + lastData: new SACNData(), + servers: new Map(), + }); + this.emit('newUniverse', { + universe: packet.universe, + firstPacket: packet, + }); } - readonly timeout: number; + const universeData: Universe = this.data.get(universe) as Universe; - protected data = new Map(); - - mergePacket(packet: Packet) { - const universe = packet.universe.toString(36); - const cid = packet.cid.toString(); - - if (!this.data.has(universe)) { - this.data.set(universe, { - lastData: new sACNData(), - servers: new Map(), - }); - this.emit('newUniverse', { - universe: packet.universe, - firstPacket: packet, - }); - } - - const universeData: Universe = this.data.get(universe) as Universe; - - if (!universeData) - throw new Error("[sACN] Internal Error: universeData is undefined") - - if (!universeData.servers.has(cid)) { - this.emit('senderConnect', { - cid: packet.cid, - universe: packet.universe, - firstPacket: packet, - }); - } + if (!universeData) { + throw new Error('[sACN] Internal Error: universeData is undefined'); + } - const ts = performance.now(); - - universeData.servers.set(cid, - { - cid: packet.cid.toString(), - data: new sACNData(packet.payload), - prio: packet.priority, - seq: packet.sequence, - universe: packet.universe, - lastTimestamp: ts, - } - ); - - setTimeout(() => { - if (universeData.servers.get(cid)?.lastTimestamp == ts) { - universeData.servers.delete(cid) - this.emit('senderDisconnect', { - cid: packet.cid, - universe: packet.universe, - lastPacket: packet - }); - }; - }, this.timeout); - - // detect which source has the highest per-universe priority - let maximumPrio = 0; - for (let [_, data] of universeData.servers) { - if (data.prio > maximumPrio && data.universe == packet.universe) { - maximumPrio = data.prio; - } - } + if (!universeData.servers.has(cid)) { + this.emit('senderConnect', { + cid: packet.cid, + universe: packet.universe, + firstPacket: packet, + }); + } - // HTP - let mergedData = new sACNData(); - for (let [_, data] of universeData.servers) { - if (data.prio == maximumPrio && data.universe == packet.universe) { - let i = 0; - while (i < 512) { - let newValue = data.data.data[i] || 0; - if ((mergedData.data[i] ?? 0) < newValue) mergedData.data[i] = newValue; - i++; - } - } - } + const ts = performance.now(); + + universeData.servers.set(cid, { + cid: packet.cid.toString(), + data: new SACNData(packet.payload), + prio: packet.priority, + seq: packet.sequence, + universe: packet.universe, + lastTimestamp: ts, + }); + + setTimeout(() => { + if (universeData.servers.get(cid)?.lastTimestamp === ts) { + universeData.servers.delete(cid); + this.emit('senderDisconnect', { + cid: packet.cid, + universe: packet.universe, + lastPacket: packet, + }); + } + }, this.timeout); + + // detect which source has the highest per-universe priority + let maximumPrio = 0; + for (const [, data] of universeData.servers) { + if (data.prio > maximumPrio && data.universe === packet.universe) { + maximumPrio = data.prio; + } + } - // only changes + // HTP + const mergedData = new SACNData(); + for (const [, data] of universeData.servers) { + if (data.prio === maximumPrio && data.universe === packet.universe) { let i = 0; while (i < 512) { - if (universeData.lastData.data[i] != mergedData.data[i]) { - super.emit("changed", { - universe: packet.universe, - addr: i + 1, - newValue: mergedData.data[i], - oldValue: universeData.lastData.data[i] - }) - } - universeData.lastData.data[i] = mergedData.data[i] || 0; - i++; + const newValue = data.data.data[i] || 0; + if ((mergedData.data[i] ?? 0) < newValue) { + mergedData.data[i] = newValue; + } + i += 1; } - super.emit("changesDone"); + } } - clearCache() { - // causes every addr value to be emitted - for (let [, univese] of this.data) { - univese.lastData = new sACNData(); - } + + // only changes + let i = 0; + while (i < 512) { + if (universeData.lastData.data[i] !== mergedData.data[i]) { + super.emit('changed', { + universe: packet.universe, + addr: i + 1, + newValue: mergedData.data[i], + oldValue: universeData.lastData.data[i], + }); + } + universeData.lastData.data[i] = mergedData.data[i] || 0; + i += 1; } + super.emit('changesDone'); + } + + clearCache() { + // causes every addr value to be emitted + for (const [, univese] of this.data) { + univese.lastData = new SACNData(); + } + } } export declare interface ReceiverMerge { - on(event: string, listener: (...args: any[]) => void): this; - on(event: Parameters[0], listener: Parameters[1]): this; - on(event: "changed", listener: (ev: { - universe: number, - addr: number, - newValue: number, - oldValue: number - }) => void): this; - on(event: "changesDone", listener: () => void): this; - on(event: "senderConnect", listener: (ev: { - cid: number, - universe: number, - firstPacket: Packet - }) => void): this; - on(event: "senderDisconnect", listener: (ev: { - cid: number, - universe: number, - lastPacket: Packet - }) => void): this; - on(event: 'packet', listener: (packet: Packet) => void): this; - on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this; - on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this; - on(event: 'error', listener: (err: Error) => void): this; -} -export class sACNData { - data: number[] = new Array(512); - constructor(recordData: Record = {}) { - this.data.fill(0); - for (let addr in recordData) { - this.data[+addr - 1] = recordData[+addr] ?? 0; - } - } + // on(event: string, listener: (...args: any[]) => void): this; + on( + event: Parameters[0], + listener: Parameters[1], + ): this; + on( + event: 'changed', + listener: (ev: { + universe: number; + addr: number; + newValue: number; + oldValue: number; + }) => void, + ): this; + on(event: 'changesDone', listener: () => void): this; + on( + event: 'senderConnect', + listener: (ev: { + cid: number; + universe: number; + firstPacket: Packet; + }) => void, + ): this; + on( + event: 'senderDisconnect', + listener: (ev: { + cid: number; + universe: number; + lastPacket: Packet; + }) => void, + ): this; + on(event: 'packet', listener: (packet: Packet) => void): this; + on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this; + on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this; + on(event: 'error', listener: (err: Error) => void): this; } diff --git a/src/sACNData.ts b/src/sACNData.ts new file mode 100644 index 0000000..bab9dab --- /dev/null +++ b/src/sACNData.ts @@ -0,0 +1,10 @@ +export class SACNData { + data: number[] = new Array(512); + + constructor(recordData: Record = {}) { + this.data.fill(0); + for (const addr in recordData) { + this.data[+addr - 1] = recordData[+addr] ?? 0; + } + } +} diff --git a/test/logData.ts b/test/logData.ts index 679e03d..3453191 100644 --- a/test/logData.ts +++ b/test/logData.ts @@ -1,12 +1,12 @@ -import { ReceiverMerge } from "../src/index"; +import { ReceiverMerge } from '../src/index'; export async function main() { - const sacn = new ReceiverMerge({ - universes: [1, 2], - reuseAddr: true, - }) - sacn.on("changed", (ev) => { - console.log(ev.universe, ev.addr, Math.round(ev.newValue * 2.55)); - }) + const sacn = new ReceiverMerge({ + universes: [1, 2], + reuseAddr: true, + }); + sacn.on('changed', (ev) => { + console.log(ev.universe, ev.addr, Math.round(ev.newValue * 2.55)); + }); } main(); From 2cdfa5666174c17c0afd6e3fb30e35e5b8000022 Mon Sep 17 00:00:00 2001 From: Kyle Hensel Date: Mon, 11 Mar 2024 19:47:46 +1100 Subject: [PATCH 12/15] use the same structure for `payload` as existing code --- src/merge.ts | 40 ++++++++++++++++++---------------------- src/sACNData.ts | 10 ---------- 2 files changed, 18 insertions(+), 32 deletions(-) delete mode 100644 src/sACNData.ts diff --git a/src/merge.ts b/src/merge.ts index 70fdef3..bf74dbf 100644 --- a/src/merge.ts +++ b/src/merge.ts @@ -1,7 +1,7 @@ import { AssertionError } from 'assert'; import { Packet } from './packet'; import { Receiver } from './receiver'; -import { SACNData } from './sACNData'; +import type { Payload } from './util'; interface MergeProps { universes?: number[]; @@ -12,13 +12,13 @@ interface MergeProps { } interface Universe { - lastData: SACNData; + lastData: Payload; servers: Map; } interface SendersData { readonly cid: string; - readonly data: SACNData; + readonly data: Payload; readonly prio: number; readonly seq: number; readonly universe: number; @@ -42,7 +42,7 @@ export class ReceiverMerge extends Receiver { if (!this.data.has(universe)) { this.data.set(universe, { - lastData: new SACNData(), + lastData: [], servers: new Map(), }); this.emit('newUniverse', { @@ -51,7 +51,7 @@ export class ReceiverMerge extends Receiver { }); } - const universeData: Universe = this.data.get(universe) as Universe; + const universeData = this.data.get(universe); if (!universeData) { throw new Error('[sACN] Internal Error: universeData is undefined'); @@ -69,7 +69,7 @@ export class ReceiverMerge extends Receiver { universeData.servers.set(cid, { cid: packet.cid.toString(), - data: new SACNData(packet.payload), + data: packet.payload, prio: packet.priority, seq: packet.sequence, universe: packet.universe, @@ -96,33 +96,29 @@ export class ReceiverMerge extends Receiver { } // HTP - const mergedData = new SACNData(); + const mergedData: Payload = {}; for (const [, data] of universeData.servers) { if (data.prio === maximumPrio && data.universe === packet.universe) { - let i = 0; - while (i < 512) { - const newValue = data.data.data[i] || 0; - if ((mergedData.data[i] ?? 0) < newValue) { - mergedData.data[i] = newValue; + for (let i = 1; i <= 512; i += 1) { + const newValue = data.data[i] || 0; + if ((mergedData[i] ?? 0) < newValue) { + mergedData[i] = newValue; } - i += 1; } } } // only changes - let i = 0; - while (i < 512) { - if (universeData.lastData.data[i] !== mergedData.data[i]) { + for (let i = 1; i <= 512; i += 1) { + if (universeData.lastData[i] !== mergedData[i]) { super.emit('changed', { universe: packet.universe, - addr: i + 1, - newValue: mergedData.data[i], - oldValue: universeData.lastData.data[i], + addr: i, + newValue: mergedData[i], + oldValue: universeData.lastData[i], }); } - universeData.lastData.data[i] = mergedData.data[i] || 0; - i += 1; + universeData.lastData[i] = mergedData[i] || 0; } super.emit('changesDone'); } @@ -130,7 +126,7 @@ export class ReceiverMerge extends Receiver { clearCache() { // causes every addr value to be emitted for (const [, univese] of this.data) { - univese.lastData = new SACNData(); + univese.lastData = {}; } } } diff --git a/src/sACNData.ts b/src/sACNData.ts deleted file mode 100644 index bab9dab..0000000 --- a/src/sACNData.ts +++ /dev/null @@ -1,10 +0,0 @@ -export class SACNData { - data: number[] = new Array(512); - - constructor(recordData: Record = {}) { - this.data.fill(0); - for (const addr in recordData) { - this.data[+addr - 1] = recordData[+addr] ?? 0; - } - } -} From 70c6ae8a79573d058eae58a4a56b78cc4be4cda5 Mon Sep 17 00:00:00 2001 From: Kyle Hensel Date: Mon, 11 Mar 2024 19:58:42 +1100 Subject: [PATCH 13/15] expand abbreviations --- src/merge.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/merge.ts b/src/merge.ts index bf74dbf..afb4b72 100644 --- a/src/merge.ts +++ b/src/merge.ts @@ -19,8 +19,8 @@ interface Universe { interface SendersData { readonly cid: string; readonly data: Payload; - readonly prio: number; - readonly seq: number; + readonly priority: number; + readonly sequence: number; readonly universe: number; readonly lastTimestamp: number; } @@ -70,8 +70,8 @@ export class ReceiverMerge extends Receiver { universeData.servers.set(cid, { cid: packet.cid.toString(), data: packet.payload, - prio: packet.priority, - seq: packet.sequence, + priority: packet.priority, + sequence: packet.sequence, universe: packet.universe, lastTimestamp: ts, }); @@ -90,15 +90,15 @@ export class ReceiverMerge extends Receiver { // detect which source has the highest per-universe priority let maximumPrio = 0; for (const [, data] of universeData.servers) { - if (data.prio > maximumPrio && data.universe === packet.universe) { - maximumPrio = data.prio; + if (data.priority > maximumPrio && data.universe === packet.universe) { + maximumPrio = data.priority; } } // HTP const mergedData: Payload = {}; for (const [, data] of universeData.servers) { - if (data.prio === maximumPrio && data.universe === packet.universe) { + if (data.priority === maximumPrio && data.universe === packet.universe) { for (let i = 1; i <= 512; i += 1) { const newValue = data.data[i] || 0; if ((mergedData[i] ?? 0) < newValue) { From 09ec953ee36554d29811da5fc5e82dc40e2ef359 Mon Sep 17 00:00:00 2001 From: Kyle Hensel Date: Mon, 11 Mar 2024 19:58:58 +1100 Subject: [PATCH 14/15] use the same structure for `packet` as existing code --- src/merge.ts | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/src/merge.ts b/src/merge.ts index afb4b72..d9c4f60 100644 --- a/src/merge.ts +++ b/src/merge.ts @@ -13,15 +13,11 @@ interface MergeProps { interface Universe { lastData: Payload; - servers: Map; + servers: Map; } -interface SendersData { - readonly cid: string; - readonly data: Payload; - readonly priority: number; - readonly sequence: number; - readonly universe: number; +interface PacketWithTimestamp { + readonly packet: Packet; readonly lastTimestamp: number; } @@ -36,7 +32,7 @@ export class ReceiverMerge extends Receiver { protected data = new Map(); - mergePacket(packet: Packet) { + public mergePacket(packet: Packet) { const universe = packet.universe.toString(36); const cid = packet.cid.toString(); @@ -68,11 +64,7 @@ export class ReceiverMerge extends Receiver { const ts = performance.now(); universeData.servers.set(cid, { - cid: packet.cid.toString(), - data: packet.payload, - priority: packet.priority, - sequence: packet.sequence, - universe: packet.universe, + packet, lastTimestamp: ts, }); @@ -88,19 +80,25 @@ export class ReceiverMerge extends Receiver { }, this.timeout); // detect which source has the highest per-universe priority - let maximumPrio = 0; - for (const [, data] of universeData.servers) { - if (data.priority > maximumPrio && data.universe === packet.universe) { - maximumPrio = data.priority; + let maximumPriority = 0; + for (const [, { packet: thisPacket }] of universeData.servers) { + if ( + thisPacket.priority > maximumPriority && + thisPacket.universe === packet.universe + ) { + maximumPriority = thisPacket.priority; } } // HTP const mergedData: Payload = {}; - for (const [, data] of universeData.servers) { - if (data.priority === maximumPrio && data.universe === packet.universe) { + for (const [, { packet: thisPacket }] of universeData.servers) { + if ( + thisPacket.priority === maximumPriority && + thisPacket.universe === packet.universe + ) { for (let i = 1; i <= 512; i += 1) { - const newValue = data.data[i] || 0; + const newValue = thisPacket.payload[i] || 0; if ((mergedData[i] ?? 0) < newValue) { mergedData[i] = newValue; } @@ -123,7 +121,7 @@ export class ReceiverMerge extends Receiver { super.emit('changesDone'); } - clearCache() { + public clearCache() { // causes every addr value to be emitted for (const [, univese] of this.data) { univese.lastData = {}; From 7609cfc1c3116885f1063de64fcff5e904bc9257 Mon Sep 17 00:00:00 2001 From: Kyle Hensel Date: Mon, 11 Mar 2024 20:12:01 +1100 Subject: [PATCH 15/15] =?UTF-8?q?pre=C3=ABmptively=20rename=20conflicting?= =?UTF-8?q?=20file?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/index.ts | 1 - src/{merge.ts => receiver/abstract.ts} | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) rename src/{merge.ts => receiver/abstract.ts} (97%) diff --git a/src/index.ts b/src/index.ts index 06232f4..4e7b8f5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,3 @@ export * from './packet'; export * from './receiver'; export * from './sender'; -export * from './merge'; diff --git a/src/merge.ts b/src/receiver/abstract.ts similarity index 97% rename from src/merge.ts rename to src/receiver/abstract.ts index d9c4f60..2eb84eb 100644 --- a/src/merge.ts +++ b/src/receiver/abstract.ts @@ -1,7 +1,7 @@ import { AssertionError } from 'assert'; -import { Packet } from './packet'; -import { Receiver } from './receiver'; -import type { Payload } from './util'; +import { Packet } from '../packet'; +import { Receiver } from '../receiver'; +import type { Payload } from '../util'; interface MergeProps { universes?: number[];