-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
implement htp and ltp merging receivers
- Loading branch information
Showing
6 changed files
with
291 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import { DMXUniverse } from './universe'; | ||
import { Packet } from '../packet'; | ||
|
||
export interface SenderData { | ||
readonly cid: string; | ||
readonly priority: number; | ||
readonly universe: number; | ||
readonly data: DMXUniverse; | ||
readonly lastUpdate: number; | ||
readonly sequence: number; | ||
} | ||
|
||
export interface UniverseData { | ||
referenceData: DMXUniverse; | ||
servers: Map<string, SenderData>; | ||
} | ||
|
||
export interface PreparedData { | ||
universe: number; | ||
maximumPriority: number; | ||
universeData: UniverseData; | ||
} | ||
|
||
export interface ProcessedData extends PreparedData { | ||
mergedData: DMXUniverse; | ||
} | ||
|
||
export interface MergingReceiverChannelChanged { | ||
universe: number; | ||
address: number; | ||
newValue: number; | ||
oldValue: number; | ||
} | ||
|
||
export interface MergingReceiverUniverseChanged { | ||
universe: number; | ||
payload: number[]; | ||
} | ||
|
||
export interface MergingReceiverSenderConnect { | ||
cid: number; | ||
universe: number; | ||
firstPacket: Packet; | ||
} | ||
|
||
export interface MergingReceiverSenderDisconnect { | ||
cid: number; | ||
universe: number; | ||
lastPacket: Packet; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
export class DMXUniverse { | ||
readonly data: number[] = new Array(512); | ||
|
||
constructor(recordData: Record<number, number> = {}) { | ||
// init universe using `0` for every channel | ||
this.data.fill(0); | ||
|
||
// set provided values | ||
for (const addr in recordData) { | ||
this.data[+addr - 1] = recordData[+addr] ?? 0; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
export * from './packet'; | ||
export * from './receiver'; | ||
export * from './receiver/htp'; | ||
export * from './receiver/ltp'; | ||
export * from './sender'; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
import { AssertionError } from 'assert'; | ||
import { Receiver, ReceiverProps } from '../receiver'; | ||
import { DMXUniverse } from '../data/universe'; | ||
import { Packet } from '../packet'; | ||
import { | ||
MergingReceiverChannelChanged, | ||
MergingReceiverSenderConnect, | ||
MergingReceiverSenderDisconnect, | ||
MergingReceiverUniverseChanged, | ||
PreparedData, | ||
ProcessedData, | ||
UniverseData, | ||
} from '../data/merging'; | ||
|
||
export interface MergingReceiverProps extends ReceiverProps { | ||
universes?: number[]; | ||
port?: number; | ||
iface?: string; | ||
reuseAddr?: boolean; | ||
timeout?: number; | ||
} | ||
|
||
export abstract class AbstractMergingReceiver extends Receiver { | ||
protected readonly timeout: number; | ||
|
||
protected data = new Map<number, UniverseData>(); | ||
|
||
constructor({ timeout = 5000, ...props }: MergingReceiverProps) { | ||
super(props); | ||
|
||
this.timeout = timeout; | ||
|
||
super.on('packet', (packet) => { | ||
const data = this.prepareData(packet); | ||
const mergedData = this.mergeData(data); | ||
if (mergedData !== null) { | ||
this.handleChanges({ | ||
...data, | ||
mergedData, | ||
}); | ||
} | ||
}); | ||
} | ||
|
||
protected prepareData(packet: Packet): PreparedData { | ||
const currentTime = performance.now(); | ||
const universe = parseInt(packet.universe.toString(36), 10); | ||
const cid = packet.cid.toString(); | ||
|
||
// universe is unknown | ||
if (!this.data.has(universe)) { | ||
this.data.set(universe, { | ||
referenceData: new DMXUniverse(), | ||
servers: new Map(), | ||
}); | ||
|
||
this.emit('newUniverse', { | ||
universe, | ||
firstPacket: packet, | ||
}); | ||
} | ||
|
||
const universeData: UniverseData = this.data.get(universe) as UniverseData; | ||
if (!universeData) { | ||
throw new Error('[sACN] Internal Error: universeData is undefined'); | ||
} | ||
|
||
// sender is unknown for this universe | ||
if (!universeData.servers.has(cid)) { | ||
this.emit('senderConnect', { | ||
cid: packet.cid, | ||
universe, | ||
firstPacket: packet, | ||
}); | ||
} | ||
|
||
// register current package | ||
universeData.servers.set(cid, { | ||
cid: packet.cid.toString(), | ||
priority: packet.priority, | ||
universe, | ||
data: new DMXUniverse(packet.payload), | ||
lastUpdate: currentTime, | ||
sequence: packet.sequence, | ||
}); | ||
|
||
// check whether sender disconnects | ||
setTimeout(() => { | ||
if (universeData.servers.get(cid)?.lastUpdate === currentTime) { | ||
universeData.servers.delete(cid); | ||
|
||
this.emit('senderDisconnect', { | ||
cid: packet.cid, | ||
universe, | ||
lastPacket: packet, | ||
}); | ||
} | ||
}, this.timeout); | ||
|
||
// detect which source has the highest per-universe priority | ||
let maximumPriority = 0; | ||
for (const [, data] of universeData.servers) { | ||
if ( | ||
data.priority > maximumPriority && | ||
data.universe === packet.universe | ||
) { | ||
maximumPriority = data.priority; | ||
} | ||
} | ||
|
||
return { | ||
universe, | ||
maximumPriority, | ||
universeData, | ||
}; | ||
} | ||
|
||
protected abstract mergeData(packet: PreparedData): DMXUniverse | null; | ||
|
||
protected handleChanges(data: ProcessedData): void { | ||
const { referenceData } = data.universeData; | ||
const { mergedData } = data; | ||
|
||
// only changes | ||
let changesDetected = false; | ||
for (let ch = 0; ch < 512; ch += 1) { | ||
if (referenceData.data[ch] !== mergedData.data[ch]) { | ||
changesDetected = true; | ||
|
||
super.emit('changedValue', { | ||
universe: data.universe, | ||
address: ch + 1, | ||
newValue: mergedData.data[ch], | ||
oldValue: referenceData.data[ch], | ||
} as MergingReceiverChannelChanged); | ||
} | ||
} | ||
|
||
if (changesDetected) { | ||
this.data.get(data.universe)!.referenceData = mergedData; | ||
|
||
super.emit('changed', { | ||
universe: data.universe, | ||
payload: mergedData.data, | ||
} as MergingReceiverUniverseChanged); | ||
} | ||
} | ||
} | ||
|
||
export declare interface AbstractMergingReceiver { | ||
on( | ||
event: Parameters<Receiver['on']>[0], | ||
listener: Parameters<Receiver['on']>[1], | ||
): this; | ||
on( | ||
event: 'changedValue', | ||
listener: (ev: MergingReceiverChannelChanged) => void, | ||
): this; | ||
on( | ||
event: 'changed', | ||
listener: (ev: MergingReceiverUniverseChanged) => void, | ||
): this; | ||
on( | ||
event: 'senderConnect', | ||
listener: (ev: MergingReceiverSenderConnect) => void, | ||
): this; | ||
on( | ||
event: 'senderDisconnect', | ||
listener: (ev: MergingReceiverSenderDisconnect) => void, | ||
): this; | ||
on(event: 'packet', listener: (packet: Packet) => void): this; | ||
on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this; | ||
on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this; | ||
on(event: 'error', listener: (err: Error) => void): this; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import { AbstractMergingReceiver } from './abstract'; | ||
import { PreparedData } from '../data/merging'; | ||
import { DMXUniverse } from '../data/universe'; | ||
|
||
export class HTPMergingReceiver extends AbstractMergingReceiver { | ||
// eslint-disable-next-line class-methods-use-this | ||
protected mergeData(data: PreparedData): DMXUniverse | null { | ||
const mergedData = new DMXUniverse(); | ||
|
||
for (const [, tmp] of data.universeData.servers) { | ||
if ( | ||
tmp.priority === data.maximumPriority && | ||
tmp.universe === data.universe | ||
) { | ||
for (let ch = 0; ch < 512; ch += 1) { | ||
const newValue = tmp.data.data[ch] || 0; | ||
if ((mergedData.data[ch] ?? 0) < newValue) { | ||
mergedData.data[ch] = newValue; | ||
} | ||
} | ||
} | ||
} | ||
|
||
return mergedData; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import { DMXUniverse } from '../data/universe'; | ||
import { PreparedData } from '../data/merging'; | ||
import { AbstractMergingReceiver } from './abstract'; | ||
|
||
export class LTPMergingReceiver extends AbstractMergingReceiver { | ||
// eslint-disable-next-line class-methods-use-this | ||
protected mergeData(data: PreparedData): DMXUniverse | null { | ||
const { referenceData } = data.universeData; | ||
const mergedData = new DMXUniverse(); | ||
|
||
for (let ch = 0; ch < 512; ch += 1) { | ||
const referenceTime = 0; | ||
mergedData.data[ch] = referenceData.data[ch] || 0; | ||
|
||
for (const [, tmp] of data.universeData.servers) { | ||
if (tmp.lastUpdate > referenceTime) { | ||
mergedData.data[ch] = | ||
tmp.data.data[ch] || (mergedData.data[ch] as number); | ||
} | ||
} | ||
} | ||
|
||
return mergedData; | ||
} | ||
} |