Skip to content

Commit

Permalink
implement htp and ltp merging receivers (#58)
Browse files Browse the repository at this point in the history
Co-authored-by: Kyle Hensel <[email protected]>
  • Loading branch information
mutec and k-yle authored Jul 22, 2024
1 parent e23fcd1 commit 6fdc29b
Show file tree
Hide file tree
Showing 3 changed files with 318 additions and 56 deletions.
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './packet';
export * from './receiver';
// eslint-disable-next-line camelcase
export { MergingReceiver as unstable_MergingReceiver } from './merging';
export * from './sender';
237 changes: 181 additions & 56 deletions src/merging.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,65 @@
import { performance } from 'node:perf_hooks';
import { Packet } from './packet';
import { performance } from 'perf_hooks';
import type { Packet } from './packet';
import { Receiver } from './receiver';
import type { Payload } from './util';

interface Universe {
lastData: Payload;
servers: Map<string, PacketWithTimestamp>;
}

interface PacketWithTimestamp {
readonly packet: Packet;
readonly lastTimestamp: number;
}
/**
* @deprecated CAUTION: This feature is experimental,
* and has not been thoroughly tested. It may not behave
* correctly. There is no guarantee that it adheres to
* the E1.33 standard.
*/
export namespace MergingReceiver {
/** See {@link Props.mode here} for docs */
export type Mode = 'HTP' | 'LTP';

export namespace ReceiverMerge {
export interface Props extends Receiver.Props {
/**
* ### Different priority
* .
* When merging, all senders should normally have a different
* `priority`. Following this rule will prevent most of the
* confusion around merging.
*
* 💡 _Use case: tracking-backup console._
*
* ### Same priority
* .
* If there are 2 senders with the same `priority`,
* then you need to specify the merging mode:
*
* - `HTP` = **H**ighest **t**akes **P**riority. This means
* that the receiver will use the highest channel value from
* all senders with the same `priority`. If there is a
* malfunction, channels may appear to be stuck, even when
* blacked-out on one console.
* 💡 _Use case: {@link https://youtu.be/vygFW9FDYtM parking a channel}
* or controlling {@link https://en.wiktionary.org/wiki/houselights houselights}
* from a different console._
*
* - `LTP` = **L**atest **t**akes **P**riority. This means that
* the receiver will use the latest data that it receives from
* the senders with the highest `priority`. **This options is
* not recomended, because a malfunction will cause of lights
* to flicker uncontrollably.**
* 💡 _Use case: none._
*
* ℹ️ Please refer to the README for more information.
*
* @default 'HTP'
*/
mode?: Mode;
timeout?: number;
}

export interface EventMap extends Receiver.EventMap {
changed: {
universe: number;
addr: number;
payload: Payload;
};
changedValue: {
universe: number;
address: number;
newValue: number;
oldValue: number;
};
Expand All @@ -37,68 +75,101 @@ export namespace ReceiverMerge {
lastPacket: Packet;
};
}

export interface PacketWithTime {
readonly packet: Packet;
readonly timestamp: number;
}

export interface UniverseData {
referenceData: Payload;
servers: Map<string, PacketWithTime>;
}

export interface PreparedData {
universe: number;
maximumPriority: number;
universeData: UniverseData;
}
}

export declare interface ReceiverMerge {
export declare interface MergingReceiver {
on<K extends keyof Receiver.EventMap>(
type: K,
listener: (event: Receiver.EventMap[K]) => void,
): this;
}

export class ReceiverMerge extends Receiver {
constructor({ timeout = 5000, ...props }: ReceiverMerge.Props) {
export class MergingReceiver extends Receiver {
private readonly mode: MergingReceiver.Mode;

private readonly timeout: number;

private data = new Map<number, MergingReceiver.UniverseData>();

constructor({
mode = 'HTP',
timeout = 5000,
...props
}: MergingReceiver.Props) {
super(props);
this.timeout = timeout;
super.on('packet', this.mergePacket);
}

readonly timeout: number;
this.mode = mode;
this.timeout = timeout;

protected data = new Map<string, Universe>();
super.on('packet', (packet) => {
const data = this.prepareData(packet);
const mergedData = MergingReceiver[this.mode](data);
this.handleChanges(data, mergedData);
});
}

public mergePacket(packet: Packet) {
const universe = packet.universe.toString(36);
private prepareData(packet: Packet): MergingReceiver.PreparedData {
const currentTime = performance.now();
const universe = parseInt(packet.universe.toString(36), 10);
const cid = packet.cid.toString();

// universe is unknown
if (!this.data.has(universe)) {
this.data.set(universe, {
lastData: [],
referenceData: {},
servers: new Map(),
});

this.emit('newUniverse', {
universe: packet.universe,
universe,
firstPacket: packet,
});
}

const universeData = this.data.get(universe);

if (!universeData) {
throw new Error('[sACN] Internal Error: universeData is undefined');
}

// sender is unknown for this universe
if (!universeData.servers.has(cid)) {
this.emit('senderConnect', {
cid: packet.cid,
universe: packet.universe,
universe,
firstPacket: packet,
});
}

const ts = performance.now();

// register current package
universeData.servers.set(cid, {
packet,
lastTimestamp: ts,
timestamp: currentTime,
});

// check whether sender disconnects
setTimeout(() => {
if (universeData.servers.get(cid)?.lastTimestamp === ts) {
if (universeData.servers.get(cid)?.timestamp === currentTime) {
universeData.servers.delete(cid);

this.emit('senderDisconnect', {
cid: packet.cid,
universe: packet.universe,
universe,
lastPacket: packet,
});
}
Expand All @@ -115,41 +186,95 @@ export class ReceiverMerge extends Receiver {
}
}

// HTP
return {
universe,
maximumPriority,
universeData,
};
}

private handleChanges(
data: MergingReceiver.PreparedData,
mergedData: Payload,
): void {
const { referenceData } = data.universeData;

// only changes
let changesDetected = false;
for (let ch = 1; ch <= 512; ch += 1) {
if (referenceData[ch] !== mergedData[ch]) {
changesDetected = true;

const event: MergingReceiver.EventMap['changedValue'] = {
universe: data.universe,
address: ch,
newValue: mergedData[ch]!,
oldValue: referenceData[ch]!,
};
super.emit('changedValue', event);
}
}

if (changesDetected) {
this.data.get(data.universe)!.referenceData = mergedData;

const event: MergingReceiver.EventMap['changed'] = {
universe: data.universe,
payload: mergedData,
};
super.emit('changed', event);
}
}

public static HTP(data: MergingReceiver.PreparedData): Payload {
const mergedData: Payload = {};
for (const [, { packet: thisPacket }] of universeData.servers) {

for (const [, { packet }] of data.universeData.servers) {
if (
thisPacket.priority === maximumPriority &&
thisPacket.universe === packet.universe
packet.priority === data.maximumPriority &&
packet.universe === data.universe
) {
for (let i = 1; i <= 512; i += 1) {
const newValue = thisPacket.payload[i] || 0;
if ((mergedData[i] ?? 0) < newValue) {
mergedData[i] = newValue;
for (let ch = 1; ch <= 512; ch += 1) {
const newValue = packet.payload[ch] || 0;
if ((mergedData[ch] ?? 0) < newValue) {
mergedData[ch] = newValue;
}
}
}
}

// only changes
for (let i = 1; i <= 512; i += 1) {
if (universeData.lastData[i] !== mergedData[i]) {
super.emit('changed', {
universe: packet.universe,
addr: i,
newValue: mergedData[i],
oldValue: universeData.lastData[i],
});
return mergedData;
}

/**
* LTP can only operate per-universe, not per-channel. There is no
* situation where LTP-per-channel would be useful.
*
* Therefore, this function just returns the packet with the highest
* priority and the latest timestamp.
*/
public static LTP(data: MergingReceiver.PreparedData): Payload {
let maximumTimestamp = -Infinity;
for (const [, { packet, timestamp }] of data.universeData.servers) {
if (
packet.priority === data.maximumPriority &&
packet.universe === data.universe &&
timestamp > maximumTimestamp
) {
maximumTimestamp = timestamp;
}
universeData.lastData[i] = mergedData[i] || 0;
}
super.emit('changesDone');
}

public clearCache() {
// causes every addr value to be emitted
for (const [, univese] of this.data) {
univese.lastData = {};
for (const [, { packet, timestamp }] of data.universeData.servers) {
if (
packet.priority === data.maximumPriority &&
packet.universe === data.universe &&
timestamp === maximumTimestamp
) {
return packet.payload;
}
}

throw new Error('Internal error');
}
}
Loading

0 comments on commit 6fdc29b

Please sign in to comment.