Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement htp and ltp merging receivers #58

Merged
merged 3 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './packet';
export * from './receiver';
// eslint-disable-next-line camelcase
export { MergingReceiver as unstable_MergingReceiver } from './merging';
export * from './sender';
237 changes: 181 additions & 56 deletions src/merging.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,65 @@
import { performance } from 'node:perf_hooks';
import { Packet } from './packet';
import { performance } from 'perf_hooks';
import type { Packet } from './packet';
import { Receiver } from './receiver';
import type { Payload } from './util';

interface Universe {
lastData: Payload;
servers: Map<string, PacketWithTimestamp>;
}

interface PacketWithTimestamp {
readonly packet: Packet;
readonly lastTimestamp: number;
}
/**
* @deprecated CAUTION: This feature is experimental,
* and has not been thoroughly tested. It may not behave
* correctly. There is no guarantee that it adheres to
* the E1.33 standard.
*/
export namespace MergingReceiver {
/** See {@link Props.mode here} for docs */
export type Mode = 'HTP' | 'LTP';

export namespace ReceiverMerge {
export interface Props extends Receiver.Props {
/**
* ### Different priority
* .
* When merging, all senders should normally have a different
* `priority`. Following this rule will prevent most of the
* confusion around merging.
*
* 💡 _Use case: tracking-backup console._
*
* ### Same priority
* .
* If there are 2 senders with the same `priority`,
* then you need to specify the merging mode:
*
* - `HTP` = **H**ighest **t**akes **P**riority. This means
* that the receiver will use the highest channel value from
* all senders with the same `priority`. If there is a
* malfunction, channels may appear to be stuck, even when
* blacked-out on one console.
* 💡 _Use case: {@link https://youtu.be/vygFW9FDYtM parking a channel}
* or controlling {@link https://en.wiktionary.org/wiki/houselights houselights}
* from a different console._
*
* - `LTP` = **L**atest **t**akes **P**riority. This means that
* the receiver will use the latest data that it receives from
* the senders with the highest `priority`. **This options is
* not recomended, because a malfunction will cause of lights
* to flicker uncontrollably.**
* 💡 _Use case: none._
*
* ℹ️ Please refer to the README for more information.
*
* @default 'HTP'
*/
mode?: Mode;
timeout?: number;
}

export interface EventMap extends Receiver.EventMap {
changed: {
universe: number;
addr: number;
payload: Payload;
};
changedValue: {
universe: number;
address: number;
newValue: number;
oldValue: number;
};
Expand All @@ -37,68 +75,101 @@ export namespace ReceiverMerge {
lastPacket: Packet;
};
}

export interface PacketWithTime {
readonly packet: Packet;
readonly timestamp: number;
}

export interface UniverseData {
referenceData: Payload;
servers: Map<string, PacketWithTime>;
}

export interface PreparedData {
universe: number;
maximumPriority: number;
universeData: UniverseData;
}
}

export declare interface ReceiverMerge {
export declare interface MergingReceiver {
on<K extends keyof Receiver.EventMap>(
type: K,
listener: (event: Receiver.EventMap[K]) => void,
): this;
}

export class ReceiverMerge extends Receiver {
constructor({ timeout = 5000, ...props }: ReceiverMerge.Props) {
export class MergingReceiver extends Receiver {
private readonly mode: MergingReceiver.Mode;

private readonly timeout: number;

private data = new Map<number, MergingReceiver.UniverseData>();

constructor({
mode = 'HTP',
timeout = 5000,
...props
}: MergingReceiver.Props) {
super(props);
this.timeout = timeout;
super.on('packet', this.mergePacket);
}

readonly timeout: number;
this.mode = mode;
this.timeout = timeout;

protected data = new Map<string, Universe>();
super.on('packet', (packet) => {
const data = this.prepareData(packet);
const mergedData = MergingReceiver[this.mode](data);
this.handleChanges(data, mergedData);
});
}

public mergePacket(packet: Packet) {
const universe = packet.universe.toString(36);
private prepareData(packet: Packet): MergingReceiver.PreparedData {
const currentTime = performance.now();
const universe = parseInt(packet.universe.toString(36), 10);
const cid = packet.cid.toString();

// universe is unknown
if (!this.data.has(universe)) {
this.data.set(universe, {
lastData: [],
referenceData: {},
servers: new Map(),
});

this.emit('newUniverse', {
universe: packet.universe,
universe,
firstPacket: packet,
});
}

const universeData = this.data.get(universe);

if (!universeData) {
throw new Error('[sACN] Internal Error: universeData is undefined');
}

// sender is unknown for this universe
if (!universeData.servers.has(cid)) {
this.emit('senderConnect', {
cid: packet.cid,
universe: packet.universe,
universe,
firstPacket: packet,
});
}

const ts = performance.now();

// register current package
universeData.servers.set(cid, {
packet,
lastTimestamp: ts,
timestamp: currentTime,
});

// check whether sender disconnects
setTimeout(() => {
if (universeData.servers.get(cid)?.lastTimestamp === ts) {
if (universeData.servers.get(cid)?.timestamp === currentTime) {
universeData.servers.delete(cid);

this.emit('senderDisconnect', {
cid: packet.cid,
universe: packet.universe,
universe,
lastPacket: packet,
});
}
Expand All @@ -115,41 +186,95 @@ export class ReceiverMerge extends Receiver {
}
}

// HTP
return {
universe,
maximumPriority,
universeData,
};
}

private handleChanges(
data: MergingReceiver.PreparedData,
mergedData: Payload,
): void {
const { referenceData } = data.universeData;

// only changes
let changesDetected = false;
for (let ch = 1; ch <= 512; ch += 1) {
if (referenceData[ch] !== mergedData[ch]) {
changesDetected = true;

const event: MergingReceiver.EventMap['changedValue'] = {
universe: data.universe,
address: ch,
newValue: mergedData[ch]!,
oldValue: referenceData[ch]!,
};
super.emit('changedValue', event);
}
}

if (changesDetected) {
this.data.get(data.universe)!.referenceData = mergedData;

const event: MergingReceiver.EventMap['changed'] = {
universe: data.universe,
payload: mergedData,
};
super.emit('changed', event);
}
}

public static HTP(data: MergingReceiver.PreparedData): Payload {
const mergedData: Payload = {};
for (const [, { packet: thisPacket }] of universeData.servers) {

for (const [, { packet }] of data.universeData.servers) {
if (
thisPacket.priority === maximumPriority &&
thisPacket.universe === packet.universe
packet.priority === data.maximumPriority &&
packet.universe === data.universe
) {
for (let i = 1; i <= 512; i += 1) {
const newValue = thisPacket.payload[i] || 0;
if ((mergedData[i] ?? 0) < newValue) {
mergedData[i] = newValue;
for (let ch = 1; ch <= 512; ch += 1) {
const newValue = packet.payload[ch] || 0;
if ((mergedData[ch] ?? 0) < newValue) {
mergedData[ch] = newValue;
}
}
}
}

// only changes
for (let i = 1; i <= 512; i += 1) {
if (universeData.lastData[i] !== mergedData[i]) {
super.emit('changed', {
universe: packet.universe,
addr: i,
newValue: mergedData[i],
oldValue: universeData.lastData[i],
});
return mergedData;
}

/**
* LTP can only operate per-universe, not per-channel. There is no
* situation where LTP-per-channel would be useful.
*
* Therefore, this function just returns the packet with the highest
* priority and the latest timestamp.
*/
public static LTP(data: MergingReceiver.PreparedData): Payload {
let maximumTimestamp = -Infinity;
for (const [, { packet, timestamp }] of data.universeData.servers) {
if (
packet.priority === data.maximumPriority &&
packet.universe === data.universe &&
timestamp > maximumTimestamp
) {
maximumTimestamp = timestamp;
}
universeData.lastData[i] = mergedData[i] || 0;
}
super.emit('changesDone');
}

public clearCache() {
// causes every addr value to be emitted
for (const [, univese] of this.data) {
univese.lastData = {};
for (const [, { packet, timestamp }] of data.universeData.servers) {
if (
packet.priority === data.maximumPriority &&
packet.universe === data.universe &&
timestamp === maximumTimestamp
) {
return packet.payload;
}
}

throw new Error('Internal error');
}
}
Loading
Loading