Skip to content

Commit

Permalink
implement htp and ltp merging receivers
Browse files Browse the repository at this point in the history
cherypick changes from #43 into this branch
  • Loading branch information
k-yle committed Mar 11, 2024
1 parent b494a9b commit 5689718
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 81 deletions.
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './packet';
export * from './receiver';
export * from './receiver/htp';
export * from './receiver/ltp';
export * from './sender';
98 changes: 98 additions & 0 deletions src/receiver/__tests__/merging.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { Options, Packet } from '../../packet';
import { PacketWithTime } from '../../types/merging';
import { HTPMergingReceiver } from '../htp';
import { LTPMergingReceiver } from '../ltp';

/** helper function to make it easier to create the Map */
function createPackets(
latestPackets: Record<
string,
Omit<Options, 'sequence' | 'universe'> & { timestamp?: number }
>,
) {
const latestPacketsMap = new Map<string, PacketWithTime>();
for (const server in latestPackets) {
const { timestamp = NaN, ...options } = latestPackets[server]!;
latestPacketsMap.set(server, {
packet: new Packet({ ...options, universe: 14, sequence: NaN }),
timestamp,
});
}
return latestPacketsMap;
}

describe('htp', () => {
const reciever = new HTPMergingReceiver({ universes: [14], reuseAddr: true });
// @ts-expect-error -- `protected` is not enforced at runtime.
// This is a hack to make the test simpler.
const merge = reciever.mergeData;

afterAll(() => {
reciever.close();
});

it('merges using htp', () => {
const merged = merge({
maximumPriority: 124,
universe: 14,
universeData: {
referenceData: [NaN, 1, 2, 3, 4, 5],
servers: createPackets({
'main console': { priority: 124, payload: [NaN, 101, 102, 103] },
'backup console': {
priority: 123,
payload: [NaN, 201, 202, 203, 204],
},
}),
},
});
expect(merged).toStrictEqual({
// HTP applies to the whole packet, so channel 4 is 0, instead of 204
// referenceData is irrelevant for HTP, so channel 5 is 0, instead of 5
1: 101,
2: 102,
3: 103,
});
});
});

describe('ltp', () => {
const reciever = new LTPMergingReceiver({ universes: [14], reuseAddr: true });

// @ts-expect-error -- `protected` is not enforced at runtime.
// This is a hack to make the test simpler.
const merge = reciever.mergeData;

afterAll(() => {
reciever.close();
});

it('merges using ltp', () => {
const merged = merge({
maximumPriority: 124,
universe: 14,
universeData: {
referenceData: [NaN, 1, 2, 3, 4, 5],
servers: createPackets({
'main console': {
priority: 124,
payload: [NaN, 101, 102, 103],
timestamp: 1241,
},
'backup console': {
priority: 123,
payload: [NaN, 201, 202, 203, 204],
timestamp: 1241,
},
}),
},
});
expect(merged).toStrictEqual({
// HTP applies to the whole packet, so channel 4 is 0, instead of 204
// referenceData is irrelevant for HTP, so channel 5 is 0, instead of 5
1: 101,
2: 102,
3: 103,
});
});
});
157 changes: 76 additions & 81 deletions src/receiver/abstract.ts
Original file line number Diff line number Diff line change
@@ -1,79 +1,83 @@
import { AssertionError } from 'assert';
import { Packet } from '../packet';
import { Receiver } from '../receiver';
import { Receiver, ReceiverProps } from '../receiver';
import {
MergingReceiverChannelChanged,
MergingReceiverSenderConnect,
MergingReceiverSenderDisconnect,
MergingReceiverUniverseChanged,
PreparedData,
UniverseData,
} from '../types/merging';
import type { Payload } from '../util';

interface MergeProps {
universes?: number[];
port?: number;
iface?: string;
reuseAddr?: boolean;
export interface MergingReceiverProps extends ReceiverProps {
timeout?: number;
}

interface Universe {
lastData: Payload;
servers: Map<string, PacketWithTimestamp>;
}
export abstract class AbstractMergingReceiver extends Receiver {
protected readonly timeout: number;

interface PacketWithTimestamp {
readonly packet: Packet;
readonly lastTimestamp: number;
}
protected data = new Map<number, UniverseData>();

export class ReceiverMerge extends Receiver {
constructor({ timeout = 5000, ...props }: MergeProps) {
constructor({ timeout = 5000, ...props }: MergingReceiverProps) {
super(props);
this.timeout = timeout;
super.on('packet', this.mergePacket);
}

readonly timeout: number;
this.timeout = timeout;

protected data = new Map<string, Universe>();
super.on('packet', (packet) => {
const data = this.prepareData(packet);
const mergedData = this.mergeData(data);
this.handleChanges(data, mergedData);
});
}

public mergePacket(packet: Packet) {
const universe = packet.universe.toString(36);
protected prepareData(packet: Packet): PreparedData {
const currentTime = performance.now();
const universe = parseInt(packet.universe.toString(36), 10);
const cid = packet.cid.toString();

// universe is unknown
if (!this.data.has(universe)) {
this.data.set(universe, {
lastData: [],
referenceData: {},
servers: new Map(),
});

this.emit('newUniverse', {
universe: packet.universe,
universe,
firstPacket: packet,
});
}

const universeData = this.data.get(universe);

if (!universeData) {
throw new Error('[sACN] Internal Error: universeData is undefined');
}

// sender is unknown for this universe
if (!universeData.servers.has(cid)) {
this.emit('senderConnect', {
cid: packet.cid,
universe: packet.universe,
universe,
firstPacket: packet,
});
}

const ts = performance.now();

// register current package
universeData.servers.set(cid, {
packet,
lastTimestamp: ts,
timestamp: currentTime,
});

// check whether sender disconnects
setTimeout(() => {
if (universeData.servers.get(cid)?.lastTimestamp === ts) {
if (universeData.servers.get(cid)?.timestamp === currentTime) {
universeData.servers.delete(cid);

this.emit('senderDisconnect', {
cid: packet.cid,
universe: packet.universe,
universe,
lastPacket: packet,
});
}
Expand All @@ -90,75 +94,66 @@ export class ReceiverMerge extends Receiver {
}
}

// HTP
const mergedData: Payload = {};
for (const [, { packet: thisPacket }] of universeData.servers) {
if (
thisPacket.priority === maximumPriority &&
thisPacket.universe === packet.universe
) {
for (let i = 1; i <= 512; i += 1) {
const newValue = thisPacket.payload[i] || 0;
if ((mergedData[i] ?? 0) < newValue) {
mergedData[i] = newValue;
}
}
}
}
return {
universe,
maximumPriority,
universeData,
};
}

protected abstract mergeData(packet: PreparedData): Payload;

protected handleChanges(data: PreparedData, mergedData: Payload): void {
const { referenceData } = data.universeData;

// only changes
for (let i = 1; i <= 512; i += 1) {
if (universeData.lastData[i] !== mergedData[i]) {
super.emit('changed', {
universe: packet.universe,
addr: i,
newValue: mergedData[i],
oldValue: universeData.lastData[i],
});
let changesDetected = false;
for (let ch = 1; ch <= 512; ch += 1) {
if (referenceData[ch] !== mergedData[ch]) {
changesDetected = true;

const event: MergingReceiverChannelChanged = {
universe: data.universe,
address: ch,
newValue: mergedData[ch]!,
oldValue: referenceData[ch]!,
};
super.emit('changedValue', event);
}
universeData.lastData[i] = mergedData[i] || 0;
}
super.emit('changesDone');
}

public clearCache() {
// causes every addr value to be emitted
for (const [, univese] of this.data) {
univese.lastData = {};
if (changesDetected) {
this.data.get(data.universe)!.referenceData = mergedData;

const event: MergingReceiverUniverseChanged = {
universe: data.universe,
payload: mergedData,
};
super.emit('changed', event);
}
}
}
export declare interface ReceiverMerge {
// on(event: string, listener: (...args: any[]) => void): this;

export declare interface AbstractMergingReceiver {
on(
event: Parameters<Receiver['on']>[0],
listener: Parameters<Receiver['on']>[1],
): this;
on(
event: 'changedValue',
listener: (ev: MergingReceiverChannelChanged) => void,
): this;
on(
event: 'changed',
listener: (ev: {
universe: number;
addr: number;
newValue: number;
oldValue: number;
}) => void,
listener: (ev: MergingReceiverUniverseChanged) => void,
): this;
on(event: 'changesDone', listener: () => void): this;
on(
event: 'senderConnect',
listener: (ev: {
cid: number;
universe: number;
firstPacket: Packet;
}) => void,
listener: (ev: MergingReceiverSenderConnect) => void,
): this;
on(
event: 'senderDisconnect',
listener: (ev: {
cid: number;
universe: number;
lastPacket: Packet;
}) => void,
listener: (ev: MergingReceiverSenderDisconnect) => void,
): this;
on(event: 'packet', listener: (packet: Packet) => void): this;
on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this;
Expand Down
26 changes: 26 additions & 0 deletions src/receiver/htp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { AbstractMergingReceiver } from './abstract';
import { PreparedData } from '../types/merging';
import type { Payload } from '../util';

export class HTPMergingReceiver extends AbstractMergingReceiver {
// eslint-disable-next-line class-methods-use-this
protected mergeData(data: PreparedData): Payload {
const mergedData: Payload = {};

for (const [, { packet }] of data.universeData.servers) {
if (
packet.priority === data.maximumPriority &&
packet.universe === data.universe
) {
for (let ch = 1; ch <= 512; ch += 1) {
const newValue = packet.payload[ch] || 0;
if ((mergedData[ch] ?? 0) < newValue) {
mergedData[ch] = newValue;
}
}
}
}

return mergedData;
}
}
24 changes: 24 additions & 0 deletions src/receiver/ltp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { PreparedData } from '../types/merging';
import { AbstractMergingReceiver } from './abstract';
import type { Payload } from '../util';

export class LTPMergingReceiver extends AbstractMergingReceiver {
// eslint-disable-next-line class-methods-use-this
protected mergeData(data: PreparedData): Payload {
const { referenceData } = data.universeData;
const mergedData: Payload = {};

for (let ch = 1; ch <= 512; ch += 1) {
const referenceTime = 0;
mergedData[ch] = referenceData[ch] || 0;

for (const [, { packet, timestamp }] of data.universeData.servers) {
if (timestamp > referenceTime) {
mergedData[ch] = packet.payload[ch] || mergedData[ch]!;
}
}
}

return mergedData;
}
}
Loading

0 comments on commit 5689718

Please sign in to comment.