Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NOT READY] add getStats override #25

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 156 additions & 125 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
/* jshint node: true */
'use strict';
"use strict"

var EventEmitter = require('events').EventEmitter;
var Reporter = require('./lib/reporter');
var MonitoredConnection = require('./lib/connection').MonitoredConnection;
var error = require('./lib/error');
var util = require('./lib/util');
var wildcard = require('wildcard');
const detectProvider = require('./stats');
var EventEmitter = require("events").EventEmitter
var Reporter = require("./lib/reporter")
var MonitoredConnection = require("./lib/connection").MonitoredConnection
var error = require("./lib/error")
var util = require("./lib/util")
var wildcard = require("wildcard")
const detectProvider = require("./stats")

var OPTIONAL_MONITOR_EVENTS = [
'negotiate:request', 'negotiate:renegotiate', 'negotiate:abort',
'negotiate:createOffer', 'negotiate:createOffer:created',
'negotiate:createAnswer', 'negotiate:createAnswer:created',
'negotiate:setlocaldescription',
'icecandidate:local', 'icecandidate:remote', 'icecandidate:gathered', 'icecandidate:added',
'sdp:received'
];
"negotiate:request",
"negotiate:renegotiate",
"negotiate:abort",
"negotiate:createOffer",
"negotiate:createOffer:created",
"negotiate:createAnswer",
"negotiate:createAnswer:created",
"negotiate:setlocaldescription",
"icecandidate:local",
"icecandidate:remote",
"icecandidate:gathered",
"icecandidate:added",
"sdp:received",
]

/**
# rtc-health
Expand All @@ -37,194 +44,218 @@ var OPTIONAL_MONITOR_EVENTS = [
This captures data for video streaming

**/
module.exports = function(qc, opts) {
module.exports = function (qc, opts) {
opts = opts || {}

opts = opts || {};

var provider = null;
var emitter = new EventEmitter();
emitter.pollInterval = opts.pollInterval || 1000;
var connections = {};
var timers = {};
var logs = {};
var provider = null
var emitter = new EventEmitter()
emitter.pollInterval = opts.pollInterval || 1000
var connections = {}
var timers = {}
var logs = {}

function log(peerId, pc, data) {
if (!provider) return;
return provider.getStats(pc).then((reports) => {
const tc = connections[data.id];

// Only reschedule while we are monitoring
if (tc) {
timers[data.id] = setTimeout(log.bind(this, peerId, pc, data), emitter.pollInterval);
}
if (!provider) return
return provider
.getStats(pc)
.then((reports) => {
const tc = connections[data.id]

// Only reschedule while we are monitoring
if (tc) {
timers[data.id] = setTimeout(
log.bind(this, peerId, pc, data),
emitter.pollInterval
)
}

var reporter = new Reporter({
source: qc.id,
about: data,
pc: pc,
reports: reports
});

emitter.emit('health:report', reporter, pc);
}).catch((err) => {
// No operation
});
var reporter = new Reporter({
source: qc.id,
about: data,
pc: pc,
reports: reports,
})

emitter.emit("health:report", reporter, pc)
})
.catch((err) => {
// No operation
})
}

/**
Emit a generic notification event that allows for tapping into the activity that is happening
within quickconnect
**/
function notify(eventName, opts) {
var args = Array.prototype.slice.call(arguments, 2);
emitter.emit('health:notify', eventName, opts, args);
emitter.emit.apply(emitter, (['health:' + eventName, opts].concat(args)));
var args = Array.prototype.slice.call(arguments, 2)
emitter.emit("health:notify", eventName, opts, args)
emitter.emit.apply(emitter, ["health:" + eventName, opts].concat(args))
}

function connectionFailure() {
var args = Array.prototype.slice.call(arguments);
emitter.emit.apply(emitter, ['health:connection:failure'].concat(args));
var args = Array.prototype.slice.call(arguments)
emitter.emit.apply(emitter, ["health:connection:failure"].concat(args))
}

function trackConnection(peerId, pc, data) {
var tc = new MonitoredConnection(qc, pc, data, {
timeUntilFailure: opts.connectionFailureTime,
onFailure: connectionFailure,
});
connections[data.id] = tc;
notify('started', { source: qc.id, about: data.id, tracker: tc });
log(peerId, pc, data);
return tc;
})
connections[data.id] = tc
notify("started", { source: qc.id, about: data.id, tracker: tc })
log(peerId, pc, data)
return tc
}

/**
Handles connection closures, either as a result of the peer connection
disconnecting, or the call ending (prior to a PC being created)
**/
function connectionClosed(peerId) {
var tc = connections[peerId];
if (!tc) return;
tc.closed();
var tc = connections[peerId]
if (!tc) return
tc.closed()

// Stop the reporting for this peer connection
if (timers[peerId]) clearTimeout(timers[peerId]);
delete connections[peerId];
if (timers[peerId]) clearTimeout(timers[peerId])
delete connections[peerId]

// Emit a closure status update
emitter.emit('health:report', new Reporter({
source: qc.id,
about: {
id: peerId,
room: tc.room
},
status: 'closed',
force: true
}));

notify('closed', { source: qc.id, about: peerId, tracker: tc });
emitter.emit(
"health:report",
new Reporter({
source: qc.id,
about: {
id: peerId,
room: tc.room,
},
status: "closed",
force: true,
})
)

notify("closed", { source: qc.id, about: peerId, tracker: tc })
}

/**
Handle the peer connection being created
**/
qc.on('peer:connect', trackConnection);

qc.on('peer:couple', function(peerId, pc, data, monitor) {
qc.on("peer:connect", trackConnection)

qc.on("peer:couple", function (peerId, pc, data, monitor) {
// Store that we are currently tracking the target peer
var tc = connections[data.id];
var status = util.toStatus(pc.iceConnectionState);
if (!tc) tc = trackConnection(peerId, pc, data);

monitor.on('statechange', function(pc, state) {
var iceConnectionState = pc.iceConnectionState;
var newStatus = util.toStatus(iceConnectionState);
notify('icestatus', {
source: qc.id, about: data.id, tracker: tc
}, iceConnectionState);
emitter.emit('health:changed', tc, iceConnectionState);
var tc = connections[data.id]
var status = util.toStatus(pc.iceConnectionState)
if (!tc) tc = trackConnection(peerId, pc, data)

monitor.on("statechange", function (pc, state) {
var iceConnectionState = pc.iceConnectionState
var newStatus = util.toStatus(iceConnectionState)
notify(
"icestatus",
{
source: qc.id,
about: data.id,
tracker: tc,
},
iceConnectionState
)
emitter.emit("health:changed", tc, iceConnectionState)

if (status != newStatus) {
emitter.emit('health:connection:status', tc, newStatus, status);
status = newStatus;
if (status === 'connected') {
tc.connected();
} else if (status === 'error') {
tc.failed('ICE connection state error');
emitter.emit("health:connection:status", tc, newStatus, status)
status = newStatus
if (status === "connected") {
tc.connected()
} else if (status === "error") {
tc.failed("ICE connection state error")
}
}
});
})

monitor.on('closed', connectionClosed.bind(this, peerId));
});
monitor.on("closed", connectionClosed.bind(this, peerId))
})

qc.on('call:failed', function(peerId) {
var tc = connections[peerId];
tc.failed('Call failed to connect');
});
qc.on("call:failed", function (peerId) {
var tc = connections[peerId]
tc.failed("Call failed to connect")
})

// Close tracked connections on call:ended as well
qc.on('call:ended', connectionClosed);
qc.on("call:ended", connectionClosed)

// Setup to listen to the entire feed
qc.feed(function(evt) {
var name = evt.name;
qc.feed(function (evt) {
var name = evt.name

if (util.SIGNALLER_EVENTS.indexOf(name) >= 0) {
return notify.apply(
notify,
[name, { source: qc.id, about: 'signaller' }].concat(evt.args)
);
[name, { source: qc.id, about: "signaller" }].concat(evt.args)
)
}

// Listen for the optional verbose events
var matching = opts.verbose && wildcard('pc.*', name);
var matching = opts.verbose && wildcard("pc.*", name)
if (matching) {
var peerId = matching[1];
var shortName = matching.slice(2).join('.');
var tc = connections[peerId];
var peerId = matching[1]
var shortName = matching.slice(2).join(".")
var tc = connections[peerId]
return notify.apply(
notify,
[shortName, { source: qc.id, about: peerId, tracker: tc }].concat(evt.args)
);
[shortName, { source: qc.id, about: peerId, tracker: tc }].concat(
evt.args
)
)
}
});
})

// Helper method to safely close all connections
emitter.closeConnections = function() {
emitter.closeConnections = function () {
for (var connId in connections) {
connections[connId].close();
connections[connId].close()
}
};
}

// Helper method to expose the current tracked connections
emitter.getConnections = function() {
var results = [];
emitter.getConnections = function () {
var results = []
for (var target in connections) {
results.push(connections[target]);
results.push(connections[target])
}
return results;
};
return results
}

// Get the tracked connection for the given target peer
emitter.getConnection = function(target) {
return connections && connections[target];
};
emitter.getConnection = function (target) {
return connections && connections[target]
}

// Override provider's getStats function with provided getStats function
emitter.setGetStatsFn = function (getStatsFn) {
if (!provider) return

provider.provider.setGetStatsFn?.(getStatsFn)
}

// Provider detection
function detect() {
provider = detectProvider(opts);
provider = detectProvider(opts)
if (!provider) {
console.log('WARNING! No WebRTC provider detected - rtc-health is disabled until a provider is detected');
console.log(
"WARNING! No WebRTC provider detected - rtc-health is disabled until a provider is detected"
)
}
}

// In the case of some plugins, we don't get notified that it's using a plugin, and
// have no means to reasonably identify the existence of a plugin
// So as a final fallback, rerun the detection on a local announce
qc.once('local:announce', detect);
qc.once("local:announce", detect)

// Attempt to detect initially
detect();
return emitter;
};
detect()
return emitter
}
Loading