Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support merging #43

Merged
merged 17 commits into from
Mar 11, 2024
167 changes: 167 additions & 0 deletions src/receiver/abstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import { AssertionError } from 'assert';
import { Packet } from '../packet';
import { Receiver } from '../receiver';
import type { Payload } from '../util';

interface MergeProps {
universes?: number[];
port?: number;
iface?: string;
reuseAddr?: boolean;
timeout?: number;
}

interface Universe {
lastData: Payload;
servers: Map<string, PacketWithTimestamp>;
}

interface PacketWithTimestamp {
readonly packet: Packet;
readonly lastTimestamp: number;
}

export class ReceiverMerge extends Receiver {
constructor({ timeout = 5000, ...props }: MergeProps) {
super(props);
this.timeout = timeout;
super.on('packet', this.mergePacket);
}

readonly timeout: number;

protected data = new Map<string, Universe>();

public mergePacket(packet: Packet) {
const universe = packet.universe.toString(36);
const cid = packet.cid.toString();

if (!this.data.has(universe)) {
this.data.set(universe, {
lastData: [],
servers: new Map(),
});
this.emit('newUniverse', {
universe: packet.universe,
firstPacket: packet,
});
}

const universeData = this.data.get(universe);

if (!universeData) {
throw new Error('[sACN] Internal Error: universeData is undefined');
}

if (!universeData.servers.has(cid)) {
this.emit('senderConnect', {
cid: packet.cid,
universe: packet.universe,
firstPacket: packet,
});
}

const ts = performance.now();

universeData.servers.set(cid, {
packet,
lastTimestamp: ts,
});

setTimeout(() => {
if (universeData.servers.get(cid)?.lastTimestamp === ts) {
universeData.servers.delete(cid);
this.emit('senderDisconnect', {
cid: packet.cid,
universe: packet.universe,
lastPacket: packet,
});
}
}, this.timeout);

// detect which source has the highest per-universe priority
let maximumPriority = 0;
for (const [, { packet: thisPacket }] of universeData.servers) {
if (
thisPacket.priority > maximumPriority &&
thisPacket.universe === packet.universe
) {
maximumPriority = thisPacket.priority;
}
}

// HTP
const mergedData: Payload = {};
for (const [, { packet: thisPacket }] of universeData.servers) {
if (
thisPacket.priority === maximumPriority &&
thisPacket.universe === packet.universe
) {
for (let i = 1; i <= 512; i += 1) {
const newValue = thisPacket.payload[i] || 0;
if ((mergedData[i] ?? 0) < newValue) {
mergedData[i] = newValue;
}
}
}
}

// only changes
for (let i = 1; i <= 512; i += 1) {
if (universeData.lastData[i] !== mergedData[i]) {
super.emit('changed', {
universe: packet.universe,
addr: i,
newValue: mergedData[i],
oldValue: universeData.lastData[i],
});
}
universeData.lastData[i] = mergedData[i] || 0;
}
super.emit('changesDone');
}

public clearCache() {
// causes every addr value to be emitted
for (const [, univese] of this.data) {
univese.lastData = {};
}
}
}
export declare interface ReceiverMerge {
// on(event: string, listener: (...args: any[]) => void): this;
on(
event: Parameters<Receiver['on']>[0],
listener: Parameters<Receiver['on']>[1],
): this;
on(
event: 'changed',
listener: (ev: {
universe: number;
addr: number;
newValue: number;
oldValue: number;
}) => void,
): this;
on(event: 'changesDone', listener: () => void): this;
on(
event: 'senderConnect',
listener: (ev: {
cid: number;
universe: number;
firstPacket: Packet;
}) => void,
): this;
on(
event: 'senderDisconnect',
listener: (ev: {
cid: number;
universe: number;
lastPacket: Packet;
}) => void,
): this;
on(event: 'packet', listener: (packet: Packet) => void): this;
on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this;
on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this;
on(event: 'error', listener: (err: Error) => void): this;
}
12 changes: 12 additions & 0 deletions test/logData.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { ReceiverMerge } from '../src/index';

export async function main() {
const sacn = new ReceiverMerge({
universes: [1, 2],
reuseAddr: true,
});
sacn.on('changed', (ev) => {
console.log(ev.universe, ev.addr, Math.round(ev.newValue * 2.55));
});
}
main();
Loading