diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bfe57d2..3e780947 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ # CHANGELOG -## X.X.X (comming soon) +## 4.7.0 (3 December 2021) + +Bugfixes: + - Prevent `EMFILE, too many open files` error when writing many messages at once. ## 4.6.0 (2 December 2021) diff --git a/package.json b/package.json index 1a3275a1..d92de1a7 100644 --- a/package.json +++ b/package.json @@ -84,6 +84,7 @@ "microseconds": "0.2.0", "nano-time": "1.0.0", "oblivious-set": "1.0.0", + "p-queue": "6.6.2", "rimraf": "3.0.2", "unload": "2.3.1" }, diff --git a/perf.txt b/perf.txt index f6ea34d6..9dcf1976 100644 --- a/perf.txt +++ b/perf.txt @@ -325,6 +325,24 @@ AFTER: { =================================== -2.12.2021 +3.12.2021 =================================== +{ + "openClose": 1110.2557100057602, + "sendRecieve": { + "parallel": 279.0764960050583, + "series": 2797.712993979454 + }, + "leaderElection": 2122.0940190553665 +} + + +{ + "openClose": 885.0331689119339, + "sendRecieve": { + "parallel": 279.1763379573822, + "series": 2232.475461959839 + }, + "leaderElection": 2150.9966419935226 +} diff --git a/src/methods/node.js b/src/methods/node.js index 97770a9a..7b4b6733 100644 --- a/src/methods/node.js +++ b/src/methods/node.js @@ -13,6 +13,7 @@ import path from 'path'; import micro from 'nano-time'; import rimraf from 'rimraf'; import isNode from 'detect-node'; +import PQueue from 'p-queue'; import { add as unloadAdd } from 'unload'; @@ -387,6 +388,12 @@ export async function create(channelName, options = {}) { paths, // contains all messages that have been emitted before emittedMessagesIds: new ObliviousSet(options.node.ttl * 2), + /** + * Used to ensure we do not write too many files at once + * which could throw an error. + * Must always be smaller then options.node.maxParallelWrites + */ + writeFileQueue: new PQueue({ concurrency: options.node.maxParallelWrites }), messagesCallbackTime: null, messagesCallback: null, // ensures we do not read messages in parrallel @@ -533,16 +540,19 @@ export function refreshReaderClients(channelState) { }); } + /** * post a message to the other readers * @return {Promise} */ -export function postMessage(channelState, messageJson) { - const writePromise = writeMessage( - channelState.channelName, - channelState.uuid, - messageJson, - channelState.paths +export async function postMessage(channelState, messageJson) { + const writePromise = channelState.writeFileQueue.add( + () => writeMessage( + channelState.channelName, + channelState.uuid, + messageJson, + channelState.paths + ) ); channelState.writeBlockPromise = channelState.writeBlockPromise.then(async () => { diff --git a/src/options.js b/src/options.js index cd86be12..c56974c5 100644 --- a/src/options.js +++ b/src/options.js @@ -24,6 +24,11 @@ export function fillOptionsWithDefaults(originalOptions = {}) { // node if (!options.node) options.node = {}; if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; + /** + * On linux use 'ulimit -Hn' to get the limit of open files. + * On ubuntu this was 4096 for me, so we use half of that as maxParallelWrites default. + */ + if (!options.node.maxParallelWrites) options.node.maxParallelWrites = 2048; if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; return options; diff --git a/test/issues.test.js b/test/issues.test.js index 106fa246..473e1641 100644 --- a/test/issues.test.js +++ b/test/issues.test.js @@ -43,4 +43,20 @@ describe('issues.test.js', () => { await channel1.close(); await channel2.close(); }); + it('write many messages and then close', async function() { + this.timeout(40 * 1000); + const channelName = AsyncTestUtil.randomString(12); + const channel = new BroadcastChannel(channelName); + new Array(5000) + .fill(0) + .map((_i, idx) => ({ + foo: 'bar', + idx, + longString: AsyncTestUtil.randomString(40) + })) + .map(msg => channel.postMessage(msg)); + + + await channel.close(); + }); }); diff --git a/test/performance.test.js b/test/performance.test.js index 1b83f577..5e344bcf 100644 --- a/test/performance.test.js +++ b/test/performance.test.js @@ -10,6 +10,11 @@ const benchmark = { sendRecieve: {} }; +const options = { + node: { + useFastPath: false + } +}; const elapsedTime = before => { return AsyncTestUtil.performanceNow() - before; @@ -30,7 +35,7 @@ describe('performance.test.js', () => { const startTime = AsyncTestUtil.performanceNow(); for (let i = 0; i < amount; i++) { - const channel = new BroadcastChannel(channelName); + const channel = new BroadcastChannel(channelName, options); channels.push(channel); } await Promise.all( @@ -42,8 +47,8 @@ describe('performance.test.js', () => { }); it('sendRecieve.parallel', async () => { const channelName = AsyncTestUtil.randomString(10); - const channelSender = new BroadcastChannel(channelName); - const channelReciever = new BroadcastChannel(channelName); + const channelSender = new BroadcastChannel(channelName, options); + const channelReciever = new BroadcastChannel(channelName, options); const msgAmount = 2000; let emittedCount = 0; const waitPromise = new Promise(res => { @@ -69,8 +74,8 @@ describe('performance.test.js', () => { }); it('sendRecieve.series', async () => { const channelName = AsyncTestUtil.randomString(10); - const channelSender = new BroadcastChannel(channelName); - const channelReciever = new BroadcastChannel(channelName); + const channelSender = new BroadcastChannel(channelName, options); + const channelReciever = new BroadcastChannel(channelName, options); const msgAmount = 600; let emittedCount = 0; @@ -108,9 +113,9 @@ describe('performance.test.js', () => { while (t > 0) { t--; const channelName = AsyncTestUtil.randomString(10); - const channelA = new BroadcastChannel(channelName); + const channelA = new BroadcastChannel(channelName, options); channelsToClose.push(channelA); - const channelB = new BroadcastChannel(channelName); + const channelB = new BroadcastChannel(channelName, options); channelsToClose.push(channelB); const leaderElectorA = createLeaderElection(channelA); const leaderElectorB = createLeaderElection(channelB); diff --git a/types/broadcast-channel.d.ts b/types/broadcast-channel.d.ts index 10fe2b3a..199e5685 100644 --- a/types/broadcast-channel.d.ts +++ b/types/broadcast-channel.d.ts @@ -26,6 +26,12 @@ export type BroadcastChannelOptions = { node?: { ttl?: number; useFastPath?: boolean; + /** + * Opening too many write files will throw an error. + * So we ensure we throttle to have a max limit on writes. + * @link https://stackoverflow.com/questions/8965606/node-and-error-emfile-too-many-open-files + */ + maxParallelWrites?: number; }; idb?: { ttl?: number;