Skip to content

Commit

Permalink
CHANGE Better determine the correct responseTime to use to make it …
Browse files Browse the repository at this point in the history
…less likely to elect duplicate leaders
  • Loading branch information
pubkey committed Dec 15, 2021
1 parent 31a86ad commit 3bc74c7
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 21 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# CHANGELOG

## 4.8.0 (15 December 2021)

Changes:
- Better determine the correct `responseTime` to use to make it less likely to elect duplicate leaders.

## 4.7.1 (13 December 2021)

Expand Down
61 changes: 45 additions & 16 deletions src/leader-election.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const LeaderElection = function (broadcastChannel, options) {
// things to clean up
this._unl = []; // _unloads
this._lstns = []; // _listeners
this._invs = []; // _intervals
this._dpL = () => { }; // onduplicate listener
this._dpLC = false; // true when onduplicate called

Expand Down Expand Up @@ -61,7 +60,10 @@ LeaderElection.prototype = {
* false if not.
* @async
*/
applyOnce() {
applyOnce(
// true if the applyOnce() call came from the fallbackInterval cycle
isFromFallbackInterval
) {
if (this.isLeader) {
return sleep(0, true);
}
Expand Down Expand Up @@ -126,16 +128,29 @@ LeaderElection.prototype = {
}
};
this.broadcastChannel.addEventListener('internal', handleMessage);

/**
* If the applyOnce() call came from the fallbackInterval,
* we can assume that the election runs in the background and
* not critical process is waiting for it.
* When this is true, we give the other intances
* more time to answer to messages in the election cycle.
* This makes it less likely to elect duplicate leaders.
* But also it takes longer which is not a problem because we anyway
* run in the background.
*/
const waitForAnswerTime = isFromFallbackInterval ? this._options.responseTime * 4 : this._options.responseTime;

const applyPromise = _sendMessage(this, 'apply') // send out that this one is applying
.then(() => Promise.race([
sleep(this._options.responseTime / 2),
sleep(waitForAnswerTime),
stopCriteriaPromise.then(() => Promise.reject(new Error()))
]))
// send again in case another instance was just created
.then(() => _sendMessage(this, 'apply'))
// let others time to respond
.then(() => Promise.race([
sleep(this._options.responseTime / 2),
sleep(waitForAnswerTime),
stopCriteriaPromise.then(() => Promise.reject(new Error()))
]))
.catch(() => { })
Expand Down Expand Up @@ -177,8 +192,6 @@ LeaderElection.prototype = {
die() {
this._lstns.forEach(listener => this.broadcastChannel.removeEventListener('internal', listener));
this._lstns = [];
this._invs.forEach(interval => clearInterval(interval));
this._invs = [];
this._unl.forEach(uFn => uFn.remove());
this._unl = [];

Expand Down Expand Up @@ -207,7 +220,6 @@ function _awaitLeadershipOnce(leaderElector) {
return;
}
resolved = true;
clearInterval(interval);
leaderElector.broadcastChannel.removeEventListener('internal', whenDeathListener);
res(true);
}
Expand All @@ -219,15 +231,32 @@ function _awaitLeadershipOnce(leaderElector) {
}
});

// try on fallbackInterval
const interval = setInterval(() => {
leaderElector.applyOnce().then(() => {
if (leaderElector.isLeader) {
finish();
}
});
}, leaderElector._options.fallbackInterval);
leaderElector._invs.push(interval);
/**
* Try on fallbackInterval
* @recursive
*/
const tryOnFallBack = () => {
return sleep(leaderElector._options.fallbackInterval)
.then(() => {
if (leaderElector.isDead || resolved) {
return;
}
if (leaderElector.isLeader) {
finish();
} else {
return leaderElector
.applyOnce(true)
.then(() => {
if (leaderElector.isLeader) {
finish();
} else {
tryOnFallBack();
}
});
}
});
};
tryOnFallBack();

// try when other leader dies
const whenDeathListener = msg => {
Expand Down
9 changes: 8 additions & 1 deletion test/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,16 @@ function runTest(channelOptions) {
const elector2 = createLeaderElection(channel2);

await elector.awaitLeadership();
await AsyncTestUtil.wait(200);

let resolved = false;
elector2.awaitLeadership().then(() => resolved = true);
elector2.awaitLeadership().then(() => {
resolved = true;
});

// wait for the applyQueue to be done
// to not accientially skip testing the fallbackInterval election cycle.
await elector2._aplQ;

// overwrite postInternal to simulate non-responding leader
channel.postInternal = () => Promise.resolve();
Expand Down
2 changes: 1 addition & 1 deletion test/unit/indexed-db.method.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ describe('unit/indexed-db.method.test.js', () => {
});
/**
* localstorage-pings do not work in a web-workers,
* which means this should be detected and work over intervall
* which means this should be detected and work over interval
* @link https://stackoverflow.com/a/6179599/3443137
*/
it('should also work if localstorage does not work', async () => {
Expand Down
10 changes: 7 additions & 3 deletions types/leader-election.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import {

export type LeaderElectionOptions = {
/**
* This value decides how often instances will renegotiate who is leader.
* Probably should be at least 2x bigger than responseTime.
* Normally, when the leading JavaScript process dies, it will send an I-am-dead
* message to the other LeaderElectors, so that they can elect a new leader.
* On rare cases, when the JavaScript process exits ungracefully, it can happen
* that the other electors do not get a dead-message.
* So we have to also run the election cycle in an interval to ensure
* we never stuck on a state where noone is leader and noone is trying to get elected.
*/
fallbackInterval?: number;
/**
Expand Down Expand Up @@ -41,7 +45,7 @@ export declare class LeaderElector {
readonly isDead: boolean;
readonly token: string;

applyOnce(): Promise<boolean>;
applyOnce(isFromFallbackInterval?: boolean): Promise<boolean>;
awaitLeadership(): Promise<void>;
die(): Promise<void>;

Expand Down

0 comments on commit 3bc74c7

Please sign in to comment.