Skip to content

Commit

Permalink
Support merging (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
hansSchall authored Mar 11, 2024
1 parent 9acb846 commit b494a9b
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 0 deletions.
167 changes: 167 additions & 0 deletions src/receiver/abstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import { AssertionError } from 'assert';
import { Packet } from '../packet';
import { Receiver } from '../receiver';
import type { Payload } from '../util';

interface MergeProps {
universes?: number[];
port?: number;
iface?: string;
reuseAddr?: boolean;
timeout?: number;
}

interface Universe {
lastData: Payload;
servers: Map<string, PacketWithTimestamp>;
}

interface PacketWithTimestamp {
readonly packet: Packet;
readonly lastTimestamp: number;
}

export class ReceiverMerge extends Receiver {
constructor({ timeout = 5000, ...props }: MergeProps) {
super(props);
this.timeout = timeout;
super.on('packet', this.mergePacket);
}

readonly timeout: number;

protected data = new Map<string, Universe>();

public mergePacket(packet: Packet) {
const universe = packet.universe.toString(36);
const cid = packet.cid.toString();

if (!this.data.has(universe)) {
this.data.set(universe, {
lastData: [],
servers: new Map(),
});
this.emit('newUniverse', {
universe: packet.universe,
firstPacket: packet,
});
}

const universeData = this.data.get(universe);

if (!universeData) {
throw new Error('[sACN] Internal Error: universeData is undefined');
}

if (!universeData.servers.has(cid)) {
this.emit('senderConnect', {
cid: packet.cid,
universe: packet.universe,
firstPacket: packet,
});
}

const ts = performance.now();

universeData.servers.set(cid, {
packet,
lastTimestamp: ts,
});

setTimeout(() => {
if (universeData.servers.get(cid)?.lastTimestamp === ts) {
universeData.servers.delete(cid);
this.emit('senderDisconnect', {
cid: packet.cid,
universe: packet.universe,
lastPacket: packet,
});
}
}, this.timeout);

// detect which source has the highest per-universe priority
let maximumPriority = 0;
for (const [, { packet: thisPacket }] of universeData.servers) {
if (
thisPacket.priority > maximumPriority &&
thisPacket.universe === packet.universe
) {
maximumPriority = thisPacket.priority;
}
}

// HTP
const mergedData: Payload = {};
for (const [, { packet: thisPacket }] of universeData.servers) {
if (
thisPacket.priority === maximumPriority &&
thisPacket.universe === packet.universe
) {
for (let i = 1; i <= 512; i += 1) {
const newValue = thisPacket.payload[i] || 0;
if ((mergedData[i] ?? 0) < newValue) {
mergedData[i] = newValue;
}
}
}
}

// only changes
for (let i = 1; i <= 512; i += 1) {
if (universeData.lastData[i] !== mergedData[i]) {
super.emit('changed', {
universe: packet.universe,
addr: i,
newValue: mergedData[i],
oldValue: universeData.lastData[i],
});
}
universeData.lastData[i] = mergedData[i] || 0;
}
super.emit('changesDone');
}

public clearCache() {
// causes every addr value to be emitted
for (const [, univese] of this.data) {
univese.lastData = {};
}
}
}
export declare interface ReceiverMerge {
// on(event: string, listener: (...args: any[]) => void): this;
on(
event: Parameters<Receiver['on']>[0],
listener: Parameters<Receiver['on']>[1],
): this;
on(
event: 'changed',
listener: (ev: {
universe: number;
addr: number;
newValue: number;
oldValue: number;
}) => void,
): this;
on(event: 'changesDone', listener: () => void): this;
on(
event: 'senderConnect',
listener: (ev: {
cid: number;
universe: number;
firstPacket: Packet;
}) => void,
): this;
on(
event: 'senderDisconnect',
listener: (ev: {
cid: number;
universe: number;
lastPacket: Packet;
}) => void,
): this;
on(event: 'packet', listener: (packet: Packet) => void): this;
on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this;
on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this;
on(event: 'error', listener: (err: Error) => void): this;
}
12 changes: 12 additions & 0 deletions test/logData.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { ReceiverMerge } from '../src/index';

export async function main() {
const sacn = new ReceiverMerge({
universes: [1, 2],
reuseAddr: true,
});
sacn.on('changed', (ev) => {
console.log(ev.universe, ev.addr, Math.round(ev.newValue * 2.55));
});
}
main();

0 comments on commit b494a9b

Please sign in to comment.