diff --git a/src/data/merging.ts b/src/data/merging.ts new file mode 100644 index 0000000..ab810bc --- /dev/null +++ b/src/data/merging.ts @@ -0,0 +1,49 @@ +import { DMXUniverse } from './universe'; +import { Packet } from '../packet'; + +export interface SenderData { + readonly cid: string; + readonly priority: number; + readonly universe: number; + readonly data: DMXUniverse; + readonly lastUpdate: number; + readonly sequence: number; +} + +export interface UniverseData { + referenceData: DMXUniverse; + servers: Map; +} + +export interface PreparedData { + universe: number; + maximumPriority: number; + universeData: UniverseData; +} + +export interface ProcessedData extends PreparedData { + mergedData: DMXUniverse; +} + +export interface MergingReceiverChannelChanged { + universe: number; + address: number; + newValue: number; + oldValue: number; +} + +export interface MergingReceiverUniverseChanged { + universe: number; + payload: number[]; +} + +export interface MergingReceiverSenderConnect { + cid: number; + universe: number; + firstPacket: Packet; +} +export interface MergingReceiverSenderDisconnect { + cid: number; + universe: number; + lastPacket: Packet; +} diff --git a/src/data/universe.ts b/src/data/universe.ts new file mode 100644 index 0000000..da7d87d --- /dev/null +++ b/src/data/universe.ts @@ -0,0 +1,13 @@ +export class DMXUniverse { + readonly data: number[] = new Array(512); + + constructor(recordData: Record = {}) { + // init universe using `0` for every channel + this.data.fill(0); + + // set provided values + for (const addr in recordData) { + this.data[+addr - 1] = recordData[+addr] ?? 0; + } + } +} diff --git a/src/index.ts b/src/index.ts index 4e7b8f5..264e1bc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,5 @@ export * from './packet'; export * from './receiver'; +export * from './receiver/htp'; +export * from './receiver/ltp'; export * from './sender'; diff --git a/src/receiver/abstract.ts b/src/receiver/abstract.ts new file mode 100644 index 0000000..c4a9f95 --- /dev/null +++ b/src/receiver/abstract.ts @@ -0,0 +1,175 @@ +import { AssertionError } from 'assert'; +import { Receiver, ReceiverProps } from '../receiver'; +import { DMXUniverse } from '../data/universe'; +import { Packet } from '../packet'; +import { + MergingReceiverChannelChanged, + MergingReceiverSenderConnect, + MergingReceiverSenderDisconnect, + MergingReceiverUniverseChanged, + PreparedData, + ProcessedData, + UniverseData, +} from '../data/merging'; + +export interface MergingReceiverProps extends ReceiverProps { + universes?: number[]; + port?: number; + iface?: string; + reuseAddr?: boolean; + timeout?: number; +} + +export abstract class AbstractMergingReceiver extends Receiver { + protected readonly timeout: number; + + protected data = new Map(); + + constructor({ timeout = 5000, ...props }: MergingReceiverProps) { + super(props); + + this.timeout = timeout; + + super.on('packet', (packet) => { + const data = this.prepareData(packet); + const mergedData = this.mergeData(data); + if (mergedData !== null) { + this.handleChanges({ + ...data, + mergedData, + }); + } + }); + } + + protected prepareData(packet: Packet): PreparedData { + const currentTime = performance.now(); + const universe = parseInt(packet.universe.toString(36), 10); + const cid = packet.cid.toString(); + + // universe is unknown + if (!this.data.has(universe)) { + this.data.set(universe, { + referenceData: new DMXUniverse(), + servers: new Map(), + }); + + this.emit('newUniverse', { + universe, + firstPacket: packet, + }); + } + + const universeData: UniverseData = this.data.get(universe) as UniverseData; + if (!universeData) { + throw new Error('[sACN] Internal Error: universeData is undefined'); + } + + // sender is unknown for this universe + if (!universeData.servers.has(cid)) { + this.emit('senderConnect', { + cid: packet.cid, + universe, + firstPacket: packet, + }); + } + + // register current package + universeData.servers.set(cid, { + cid: packet.cid.toString(), + priority: packet.priority, + universe, + data: new DMXUniverse(packet.payload), + lastUpdate: currentTime, + sequence: packet.sequence, + }); + + // check whether sender disconnects + setTimeout(() => { + if (universeData.servers.get(cid)?.lastUpdate === currentTime) { + universeData.servers.delete(cid); + + this.emit('senderDisconnect', { + cid: packet.cid, + universe, + lastPacket: packet, + }); + } + }, this.timeout); + + // detect which source has the highest per-universe priority + let maximumPriority = 0; + for (const [, data] of universeData.servers) { + if ( + data.priority > maximumPriority && + data.universe === packet.universe + ) { + maximumPriority = data.priority; + } + } + + return { + universe, + maximumPriority, + universeData, + }; + } + + protected abstract mergeData(packet: PreparedData): DMXUniverse | null; + + protected handleChanges(data: ProcessedData): void { + const { referenceData } = data.universeData; + const { mergedData } = data; + + // only changes + let changesDetected = false; + for (let ch = 0; ch < 512; ch += 1) { + if (referenceData.data[ch] !== mergedData.data[ch]) { + changesDetected = true; + + super.emit('changedValue', { + universe: data.universe, + address: ch + 1, + newValue: mergedData.data[ch], + oldValue: referenceData.data[ch], + } as MergingReceiverChannelChanged); + } + } + + if (changesDetected) { + this.data.get(data.universe)!.referenceData = mergedData; + + super.emit('changed', { + universe: data.universe, + payload: mergedData.data, + } as MergingReceiverUniverseChanged); + } + } +} + +export declare interface AbstractMergingReceiver { + on( + event: Parameters[0], + listener: Parameters[1], + ): this; + on( + event: 'changedValue', + listener: (ev: MergingReceiverChannelChanged) => void, + ): this; + on( + event: 'changed', + listener: (ev: MergingReceiverUniverseChanged) => void, + ): this; + on( + event: 'senderConnect', + listener: (ev: MergingReceiverSenderConnect) => void, + ): this; + on( + event: 'senderDisconnect', + listener: (ev: MergingReceiverSenderDisconnect) => void, + ): this; + on(event: 'packet', listener: (packet: Packet) => void): this; + on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this; + on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this; + on(event: 'error', listener: (err: Error) => void): this; +} diff --git a/src/receiver/htp.ts b/src/receiver/htp.ts new file mode 100644 index 0000000..84b527b --- /dev/null +++ b/src/receiver/htp.ts @@ -0,0 +1,26 @@ +import { AbstractMergingReceiver } from './abstract'; +import { PreparedData } from '../data/merging'; +import { DMXUniverse } from '../data/universe'; + +export class HTPMergingReceiver extends AbstractMergingReceiver { + // eslint-disable-next-line class-methods-use-this + protected mergeData(data: PreparedData): DMXUniverse | null { + const mergedData = new DMXUniverse(); + + for (const [, tmp] of data.universeData.servers) { + if ( + tmp.priority === data.maximumPriority && + tmp.universe === data.universe + ) { + for (let ch = 0; ch < 512; ch += 1) { + const newValue = tmp.data.data[ch] || 0; + if ((mergedData.data[ch] ?? 0) < newValue) { + mergedData.data[ch] = newValue; + } + } + } + } + + return mergedData; + } +} diff --git a/src/receiver/ltp.ts b/src/receiver/ltp.ts new file mode 100644 index 0000000..6512d2e --- /dev/null +++ b/src/receiver/ltp.ts @@ -0,0 +1,25 @@ +import { DMXUniverse } from '../data/universe'; +import { PreparedData } from '../data/merging'; +import { AbstractMergingReceiver } from './abstract'; + +export class LTPMergingReceiver extends AbstractMergingReceiver { + // eslint-disable-next-line class-methods-use-this + protected mergeData(data: PreparedData): DMXUniverse | null { + const { referenceData } = data.universeData; + const mergedData = new DMXUniverse(); + + for (let ch = 0; ch < 512; ch += 1) { + const referenceTime = 0; + mergedData.data[ch] = referenceData.data[ch] || 0; + + for (const [, tmp] of data.universeData.servers) { + if (tmp.lastUpdate > referenceTime) { + mergedData.data[ch] = + tmp.data.data[ch] || (mergedData.data[ch] as number); + } + } + } + + return mergedData; + } +}