Skip to content

Commit

Permalink
FIX Prevent EMFILE, too many open files error when writing many mes…
Browse files Browse the repository at this point in the history
…sages at once
  • Loading branch information
pubkey committed Dec 3, 2021
1 parent 830b8f6 commit 1cdf8a3
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 15 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# CHANGELOG

## X.X.X (comming soon)
## 4.7.0 (3 December 2021)

Bugfixes:
- Prevent `EMFILE, too many open files` error when writing many messages at once.

## 4.6.0 (2 December 2021)

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"microseconds": "0.2.0",
"nano-time": "1.0.0",
"oblivious-set": "1.0.0",
"p-queue": "6.6.2",
"rimraf": "3.0.2",
"unload": "2.3.1"
},
Expand Down
20 changes: 19 additions & 1 deletion perf.txt
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,24 @@ AFTER: {


===================================
2.12.2021
3.12.2021
===================================

{
"openClose": 1110.2557100057602,
"sendRecieve": {
"parallel": 279.0764960050583,
"series": 2797.712993979454
},
"leaderElection": 2122.0940190553665
}


{
"openClose": 885.0331689119339,
"sendRecieve": {
"parallel": 279.1763379573822,
"series": 2232.475461959839
},
"leaderElection": 2150.9966419935226
}
22 changes: 16 additions & 6 deletions src/methods/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import path from 'path';
import micro from 'nano-time';
import rimraf from 'rimraf';
import isNode from 'detect-node';
import PQueue from 'p-queue';
import {
add as unloadAdd
} from 'unload';
Expand Down Expand Up @@ -387,6 +388,12 @@ export async function create(channelName, options = {}) {
paths,
// contains all messages that have been emitted before
emittedMessagesIds: new ObliviousSet(options.node.ttl * 2),
/**
* Used to ensure we do not write too many files at once
* which could throw an error.
* Must always be smaller then options.node.maxParallelWrites
*/
writeFileQueue: new PQueue({ concurrency: options.node.maxParallelWrites }),
messagesCallbackTime: null,
messagesCallback: null,
// ensures we do not read messages in parrallel
Expand Down Expand Up @@ -533,16 +540,19 @@ export function refreshReaderClients(channelState) {
});
}


/**
* post a message to the other readers
* @return {Promise<void>}
*/
export function postMessage(channelState, messageJson) {
const writePromise = writeMessage(
channelState.channelName,
channelState.uuid,
messageJson,
channelState.paths
export async function postMessage(channelState, messageJson) {
const writePromise = channelState.writeFileQueue.add(
() => writeMessage(
channelState.channelName,
channelState.uuid,
messageJson,
channelState.paths
)
);
channelState.writeBlockPromise = channelState.writeBlockPromise.then(async () => {

Expand Down
5 changes: 5 additions & 0 deletions src/options.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ export function fillOptionsWithDefaults(originalOptions = {}) {
// node
if (!options.node) options.node = {};
if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes;
/**
* On linux use 'ulimit -Hn' to get the limit of open files.
* On ubuntu this was 4096 for me, so we use half of that as maxParallelWrites default.
*/
if (!options.node.maxParallelWrites) options.node.maxParallelWrites = 2048;
if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true;

return options;
Expand Down
16 changes: 16 additions & 0 deletions test/issues.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,20 @@ describe('issues.test.js', () => {
await channel1.close();
await channel2.close();
});
it('write many messages and then close', async function() {
this.timeout(40 * 1000);
const channelName = AsyncTestUtil.randomString(12);
const channel = new BroadcastChannel(channelName);
new Array(5000)
.fill(0)
.map((_i, idx) => ({
foo: 'bar',
idx,
longString: AsyncTestUtil.randomString(40)
}))
.map(msg => channel.postMessage(msg));


await channel.close();
});
});
19 changes: 12 additions & 7 deletions test/performance.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ const benchmark = {
sendRecieve: {}
};

const options = {
node: {
useFastPath: false
}
};

const elapsedTime = before => {
return AsyncTestUtil.performanceNow() - before;
Expand All @@ -30,7 +35,7 @@ describe('performance.test.js', () => {

const startTime = AsyncTestUtil.performanceNow();
for (let i = 0; i < amount; i++) {
const channel = new BroadcastChannel(channelName);
const channel = new BroadcastChannel(channelName, options);
channels.push(channel);
}
await Promise.all(
Expand All @@ -42,8 +47,8 @@ describe('performance.test.js', () => {
});
it('sendRecieve.parallel', async () => {
const channelName = AsyncTestUtil.randomString(10);
const channelSender = new BroadcastChannel(channelName);
const channelReciever = new BroadcastChannel(channelName);
const channelSender = new BroadcastChannel(channelName, options);
const channelReciever = new BroadcastChannel(channelName, options);
const msgAmount = 2000;
let emittedCount = 0;
const waitPromise = new Promise(res => {
Expand All @@ -69,8 +74,8 @@ describe('performance.test.js', () => {
});
it('sendRecieve.series', async () => {
const channelName = AsyncTestUtil.randomString(10);
const channelSender = new BroadcastChannel(channelName);
const channelReciever = new BroadcastChannel(channelName);
const channelSender = new BroadcastChannel(channelName, options);
const channelReciever = new BroadcastChannel(channelName, options);
const msgAmount = 600;
let emittedCount = 0;

Expand Down Expand Up @@ -108,9 +113,9 @@ describe('performance.test.js', () => {
while (t > 0) {
t--;
const channelName = AsyncTestUtil.randomString(10);
const channelA = new BroadcastChannel(channelName);
const channelA = new BroadcastChannel(channelName, options);
channelsToClose.push(channelA);
const channelB = new BroadcastChannel(channelName);
const channelB = new BroadcastChannel(channelName, options);
channelsToClose.push(channelB);
const leaderElectorA = createLeaderElection(channelA);
const leaderElectorB = createLeaderElection(channelB);
Expand Down
6 changes: 6 additions & 0 deletions types/broadcast-channel.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ export type BroadcastChannelOptions = {
node?: {
ttl?: number;
useFastPath?: boolean;
/**
* Opening too many write files will throw an error.
* So we ensure we throttle to have a max limit on writes.
* @link https://stackoverflow.com/questions/8965606/node-and-error-emfile-too-many-open-files
*/
maxParallelWrites?: number;
};
idb?: {
ttl?: number;
Expand Down

0 comments on commit 1cdf8a3

Please sign in to comment.