Skip to content

Commit

Permalink
implement htp and ltp merging receivers
Browse files Browse the repository at this point in the history
  • Loading branch information
mutec committed Mar 10, 2024
1 parent bfe007a commit a6ccad0
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 0 deletions.
49 changes: 49 additions & 0 deletions src/data/merging.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { DMXUniverse } from './universe';
import { Packet } from '../packet';

export interface SenderData {
readonly cid: string;
readonly priority: number;
readonly universe: number;
readonly data: DMXUniverse;
readonly lastUpdate: number;
readonly sequence: number;
}

export interface UniverseData {
referenceData: DMXUniverse;
servers: Map<string, SenderData>;
}

export interface PreparedData {
universe: number;
maximumPriority: number;
universeData: UniverseData;
}

export interface ProcessedData extends PreparedData {
mergedData: DMXUniverse;
}

export interface MergingReceiverChannelChanged {
universe: number;
address: number;
newValue: number;
oldValue: number;
}

export interface MergingReceiverUniverseChanged {
universe: number;
payload: number[];
}

export interface MergingReceiverSenderConnect {
cid: number;
universe: number;
firstPacket: Packet;
}
export interface MergingReceiverSenderDisconnect {
cid: number;
universe: number;
lastPacket: Packet;
}
13 changes: 13 additions & 0 deletions src/data/universe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export class DMXUniverse {
readonly data: number[] = new Array(512);

constructor(recordData: Record<number, number> = {}) {
// init universe using `0` for every channel
this.data.fill(0);

// set provided values
for (const addr in recordData) {
this.data[+addr - 1] = recordData[+addr] ?? 0;
}
}
}
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './packet';
export * from './receiver';
export * from './receiver/htp';
export * from './receiver/ltp';
export * from './sender';
175 changes: 175 additions & 0 deletions src/receiver/abstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import { AssertionError } from 'assert';
import { Receiver, ReceiverProps } from '../receiver';
import { DMXUniverse } from '../data/universe';
import { Packet } from '../packet';
import {
MergingReceiverChannelChanged,
MergingReceiverSenderConnect,
MergingReceiverSenderDisconnect,
MergingReceiverUniverseChanged,
PreparedData,
ProcessedData,
UniverseData,
} from '../data/merging';

export interface MergingReceiverProps extends ReceiverProps {
universes?: number[];
port?: number;
iface?: string;
reuseAddr?: boolean;
timeout?: number;
}

export abstract class AbstractMergingReceiver extends Receiver {
protected readonly timeout: number;

protected data = new Map<number, UniverseData>();

constructor({ timeout = 5000, ...props }: MergingReceiverProps) {
super(props);

this.timeout = timeout;

super.on('packet', (packet) => {
const data = this.prepareData(packet);
const mergedData = this.mergeData(data);
if (mergedData !== null) {
this.handleChanges({
...data,
mergedData,
});
}
});
}

protected prepareData(packet: Packet): PreparedData {
const currentTime = performance.now();
const universe = parseInt(packet.universe.toString(36), 10);
const cid = packet.cid.toString();

// universe is unknown
if (!this.data.has(universe)) {
this.data.set(universe, {
referenceData: new DMXUniverse(),
servers: new Map(),
});

this.emit('newUniverse', {
universe,
firstPacket: packet,
});
}

const universeData: UniverseData = this.data.get(universe) as UniverseData;
if (!universeData) {
throw new Error('[sACN] Internal Error: universeData is undefined');
}

// sender is unknown for this universe
if (!universeData.servers.has(cid)) {
this.emit('senderConnect', {
cid: packet.cid,
universe,
firstPacket: packet,
});
}

// register current package
universeData.servers.set(cid, {
cid: packet.cid.toString(),
priority: packet.priority,
universe,
data: new DMXUniverse(packet.payload),
lastUpdate: currentTime,
sequence: packet.sequence,
});

// check whether sender disconnects
setTimeout(() => {
if (universeData.servers.get(cid)?.lastUpdate === currentTime) {
universeData.servers.delete(cid);

this.emit('senderDisconnect', {
cid: packet.cid,
universe,
lastPacket: packet,
});
}
}, this.timeout);

// detect which source has the highest per-universe priority
let maximumPriority = 0;
for (const [, data] of universeData.servers) {
if (
data.priority > maximumPriority &&
data.universe === packet.universe
) {
maximumPriority = data.priority;
}
}

return {
universe,
maximumPriority,
universeData,
};
}

protected abstract mergeData(packet: PreparedData): DMXUniverse | null;

protected handleChanges(data: ProcessedData): void {
const { referenceData } = data.universeData;
const { mergedData } = data;

// only changes
let changesDetected = false;
for (let ch = 0; ch < 512; ch += 1) {
if (referenceData.data[ch] !== mergedData.data[ch]) {
changesDetected = true;

super.emit('changedValue', {
universe: data.universe,
address: ch + 1,
newValue: mergedData.data[ch],
oldValue: referenceData.data[ch],
} as MergingReceiverChannelChanged);
}
}

if (changesDetected) {
this.data.get(data.universe)!.referenceData = mergedData;

super.emit('changed', {
universe: data.universe,
payload: mergedData.data,
} as MergingReceiverUniverseChanged);
}
}
}

export declare interface AbstractMergingReceiver {
on(
event: Parameters<Receiver['on']>[0],
listener: Parameters<Receiver['on']>[1],
): this;
on(
event: 'changedValue',
listener: (ev: MergingReceiverChannelChanged) => void,
): this;
on(
event: 'changed',
listener: (ev: MergingReceiverUniverseChanged) => void,
): this;
on(
event: 'senderConnect',
listener: (ev: MergingReceiverSenderConnect) => void,
): this;
on(
event: 'senderDisconnect',
listener: (ev: MergingReceiverSenderDisconnect) => void,
): this;
on(event: 'packet', listener: (packet: Packet) => void): this;
on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this;
on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this;
on(event: 'error', listener: (err: Error) => void): this;
}
26 changes: 26 additions & 0 deletions src/receiver/htp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { AbstractMergingReceiver } from './abstract';
import { PreparedData } from '../data/merging';
import { DMXUniverse } from '../data/universe';

export class HTPMergingReceiver extends AbstractMergingReceiver {
// eslint-disable-next-line class-methods-use-this
protected mergeData(data: PreparedData): DMXUniverse | null {
const mergedData = new DMXUniverse();

for (const [, tmp] of data.universeData.servers) {
if (
tmp.priority === data.maximumPriority &&
tmp.universe === data.universe
) {
for (let ch = 0; ch < 512; ch += 1) {
const newValue = tmp.data.data[ch] || 0;
if ((mergedData.data[ch] ?? 0) < newValue) {
mergedData.data[ch] = newValue;
}
}
}
}

return mergedData;
}
}
25 changes: 25 additions & 0 deletions src/receiver/ltp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { DMXUniverse } from '../data/universe';
import { PreparedData } from '../data/merging';
import { AbstractMergingReceiver } from './abstract';

export class LTPMergingReceiver extends AbstractMergingReceiver {
// eslint-disable-next-line class-methods-use-this
protected mergeData(data: PreparedData): DMXUniverse | null {
const { referenceData } = data.universeData;
const mergedData = new DMXUniverse();

for (let ch = 0; ch < 512; ch += 1) {
const referenceTime = 0;
mergedData.data[ch] = referenceData.data[ch] || 0;

for (const [, tmp] of data.universeData.servers) {
if (tmp.lastUpdate > referenceTime) {
mergedData.data[ch] =
tmp.data.data[ch] || (mergedData.data[ch] as number);
}
}
}

return mergedData;
}
}

0 comments on commit a6ccad0

Please sign in to comment.