From f24ac8d23e3a91bcd04d7b1d9866c9fbfe3aeac2 Mon Sep 17 00:00:00 2001 From: pubkey <8926560+pubkey@users.noreply.github.com> Date: Fri, 3 Dec 2021 15:52:31 +0100 Subject: [PATCH] 4.7.0 --- dist/es5node/methods/node.js | 402 ++++++++++++++++++--------------- dist/es5node/options.js | 6 + dist/esbrowser/methods/node.js | 399 +++++++++++++++++--------------- dist/esbrowser/options.js | 6 + dist/esnode/methods/node.js | 399 +++++++++++++++++--------------- dist/esnode/options.js | 6 + dist/lib/browser.js | 6 + dist/lib/browser.min.js | 2 +- dist/lib/methods/node.js | 402 ++++++++++++++++++--------------- dist/lib/options.js | 6 + docs/e2e.js | 6 + docs/iframe.js | 6 + docs/index.js | 6 + docs/leader-iframe.js | 6 + docs/worker.js | 6 + package.json | 2 +- 16 files changed, 926 insertions(+), 740 deletions(-) diff --git a/dist/es5node/methods/node.js b/dist/es5node/methods/node.js index 4c142626..0739edfe 100644 --- a/dist/es5node/methods/node.js +++ b/dist/es5node/methods/node.js @@ -60,6 +60,8 @@ var _rimraf = _interopRequireDefault(require("rimraf")); var _detectNode = _interopRequireDefault(require("detect-node")); +var _pQueue = _interopRequireDefault(require("p-queue")); + var _unload = require("unload"); var _options = require("../options.js"); @@ -152,10 +154,10 @@ function ensureBaseFolderExists() { } function _ensureBaseFolderExists() { - _ensureBaseFolderExists = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee4() { - return _regenerator["default"].wrap(function _callee4$(_context4) { + _ensureBaseFolderExists = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee3() { + return _regenerator["default"].wrap(function _callee3$(_context3) { while (1) { - switch (_context4.prev = _context4.next) { + switch (_context3.prev = _context3.next) { case 0: if (!ENSURE_BASE_FOLDER_EXISTS_PROMISE) { ENSURE_BASE_FOLDER_EXISTS_PROMISE = mkdir(TMP_FOLDER_BASE)["catch"](function () { @@ -163,14 +165,14 @@ function _ensureBaseFolderExists() { }); } - return _context4.abrupt("return", ENSURE_BASE_FOLDER_EXISTS_PROMISE); + return _context3.abrupt("return", ENSURE_BASE_FOLDER_EXISTS_PROMISE); case 2: case "end": - return _context4.stop(); + return _context3.stop(); } } - }, _callee4); + }, _callee3); })); return _ensureBaseFolderExists.apply(this, arguments); } @@ -185,24 +187,24 @@ function ensureFoldersExist(_x, _x2) { function _ensureFoldersExist() { - _ensureFoldersExist = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee5(channelName, paths) { + _ensureFoldersExist = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee4(channelName, paths) { var chmodValue; - return _regenerator["default"].wrap(function _callee5$(_context5) { + return _regenerator["default"].wrap(function _callee4$(_context4) { while (1) { - switch (_context5.prev = _context5.next) { + switch (_context4.prev = _context4.next) { case 0: paths = paths || getPaths(channelName); - _context5.next = 3; + _context4.next = 3; return ensureBaseFolderExists(); case 3: - _context5.next = 5; + _context4.next = 5; return mkdir(paths.channelBase)["catch"](function () { return null; }); case 5: - _context5.next = 7; + _context4.next = 7; return Promise.all([mkdir(paths.readers)["catch"](function () { return null; }), mkdir(paths.messages)["catch"](function () { @@ -212,17 +214,17 @@ function _ensureFoldersExist() { case 7: // set permissions so other users can use the same channel chmodValue = '777'; - _context5.next = 10; + _context4.next = 10; return Promise.all([chmod(paths.channelBase, chmodValue), chmod(paths.readers, chmodValue), chmod(paths.messages, chmodValue)])["catch"](function () { return null; }); case 10: case "end": - return _context5.stop(); + return _context4.stop(); } } - }, _callee5); + }, _callee4); })); return _ensureFoldersExist.apply(this, arguments); } @@ -232,13 +234,13 @@ function clearNodeFolder() { } function _clearNodeFolder() { - _clearNodeFolder = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee6() { - return _regenerator["default"].wrap(function _callee6$(_context6) { + _clearNodeFolder = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee5() { + return _regenerator["default"].wrap(function _callee5$(_context5) { while (1) { - switch (_context6.prev = _context6.next) { + switch (_context5.prev = _context5.next) { case 0: if (!(!TMP_FOLDER_BASE || TMP_FOLDER_BASE === '' || TMP_FOLDER_BASE === '/')) { - _context6.next = 2; + _context5.next = 2; break; } @@ -246,19 +248,19 @@ function _clearNodeFolder() { case 2: ENSURE_BASE_FOLDER_EXISTS_PROMISE = null; - _context6.next = 5; + _context5.next = 5; return removeDir(TMP_FOLDER_BASE); case 5: ENSURE_BASE_FOLDER_EXISTS_PROMISE = null; - return _context6.abrupt("return", true); + return _context5.abrupt("return", true); case 7: case "end": - return _context6.stop(); + return _context5.stop(); } } - }, _callee6); + }, _callee5); })); return _clearNodeFolder.apply(this, arguments); } @@ -304,29 +306,29 @@ function countChannelFolders() { } function _countChannelFolders() { - _countChannelFolders = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee7() { + _countChannelFolders = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee6() { var folders; - return _regenerator["default"].wrap(function _callee7$(_context7) { + return _regenerator["default"].wrap(function _callee6$(_context6) { while (1) { - switch (_context7.prev = _context7.next) { + switch (_context6.prev = _context6.next) { case 0: - _context7.next = 2; + _context6.next = 2; return ensureBaseFolderExists(); case 2: - _context7.next = 4; + _context6.next = 4; return readdir(TMP_FOLDER_BASE); case 4: - folders = _context7.sent; - return _context7.abrupt("return", folders.length); + folders = _context6.sent; + return _context6.abrupt("return", folders.length); case 6: case "end": - return _context7.stop(); + return _context6.stop(); } } - }, _callee7); + }, _callee6); })); return _countChannelFolders.apply(this, arguments); } @@ -341,42 +343,42 @@ function connectionError(_x3) { function _connectionError() { - _connectionError = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee8(originalError) { + _connectionError = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee7(originalError) { var count, addObj, text, newError; - return _regenerator["default"].wrap(function _callee8$(_context8) { + return _regenerator["default"].wrap(function _callee7$(_context7) { while (1) { - switch (_context8.prev = _context8.next) { + switch (_context7.prev = _context7.next) { case 0: - _context8.next = 2; + _context7.next = 2; return countChannelFolders(); case 2: - count = _context8.sent; + count = _context7.sent; if (!(count < 30)) { - _context8.next = 5; + _context7.next = 5; break; } - return _context8.abrupt("return", originalError); + return _context7.abrupt("return", originalError); case 5: addObj = {}; - Object.entries(originalError).forEach(function (_ref4) { - var k = _ref4[0], - v = _ref4[1]; + Object.entries(originalError).forEach(function (_ref3) { + var k = _ref3[0], + v = _ref3[1]; return addObj[k] = v; }); text = 'BroadcastChannel.create(): error: ' + 'This might happen if you have created to many channels, ' + 'like when you use BroadcastChannel in unit-tests.' + 'Try using BroadcastChannel.clearNodeFolder() to clear the tmp-folder before each test.' + 'See https://github.com/pubkey/broadcast-channel#clear-tmp-folder'; newError = new Error(text + ': ' + JSON.stringify(addObj, null, 2)); - return _context8.abrupt("return", newError); + return _context7.abrupt("return", newError); case 10: case "end": - return _context8.stop(); + return _context7.stop(); } } - }, _callee8); + }, _callee7); })); return _connectionError.apply(this, arguments); } @@ -386,11 +388,11 @@ function createSocketEventEmitter(_x4, _x5, _x6) { } function _createSocketEventEmitter() { - _createSocketEventEmitter = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee11(channelName, readerUuid, paths) { + _createSocketEventEmitter = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee10(channelName, readerUuid, paths) { var pathToSocket, emitter, server; - return _regenerator["default"].wrap(function _callee11$(_context11) { + return _regenerator["default"].wrap(function _callee10$(_context10) { while (1) { - switch (_context11.prev = _context11.next) { + switch (_context10.prev = _context10.next) { case 0: pathToSocket = socketPath(channelName, readerUuid, paths); emitter = new _events["default"].EventEmitter(); @@ -400,53 +402,53 @@ function _createSocketEventEmitter() { emitter.emit('data', msg.toString()); }); }); - _context11.next = 5; + _context10.next = 5; return new Promise(function (resolve, reject) { server.on('error', /*#__PURE__*/function () { - var _ref5 = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee9(err) { + var _ref4 = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee8(err) { var useErr; - return _regenerator["default"].wrap(function _callee9$(_context9) { + return _regenerator["default"].wrap(function _callee8$(_context8) { while (1) { - switch (_context9.prev = _context9.next) { + switch (_context8.prev = _context8.next) { case 0: - _context9.next = 2; + _context8.next = 2; return connectionError(err); case 2: - useErr = _context9.sent; + useErr = _context8.sent; reject(useErr); case 4: case "end": - return _context9.stop(); + return _context8.stop(); } } - }, _callee9); + }, _callee8); })); - return function (_x24) { - return _ref5.apply(this, arguments); + return function (_x26) { + return _ref4.apply(this, arguments); }; }()); server.listen(pathToSocket, /*#__PURE__*/function () { - var _ref6 = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee10(err, res) { + var _ref5 = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee9(err, res) { var useErr; - return _regenerator["default"].wrap(function _callee10$(_context10) { + return _regenerator["default"].wrap(function _callee9$(_context9) { while (1) { - switch (_context10.prev = _context10.next) { + switch (_context9.prev = _context9.next) { case 0: if (!err) { - _context10.next = 7; + _context9.next = 7; break; } - _context10.next = 3; + _context9.next = 3; return connectionError(err); case 3: - useErr = _context10.sent; + useErr = _context9.sent; reject(useErr); - _context10.next = 8; + _context9.next = 8; break; case 7: @@ -454,20 +456,20 @@ function _createSocketEventEmitter() { case 8: case "end": - return _context10.stop(); + return _context9.stop(); } } - }, _callee10); + }, _callee9); })); - return function (_x25, _x26) { - return _ref6.apply(this, arguments); + return function (_x27, _x28) { + return _ref5.apply(this, arguments); }; }()); }); case 5: - return _context11.abrupt("return", { + return _context10.abrupt("return", { path: pathToSocket, emitter: emitter, server: server @@ -475,10 +477,10 @@ function _createSocketEventEmitter() { case 6: case "end": - return _context11.stop(); + return _context10.stop(); } } - }, _callee11); + }, _callee10); })); return _createSocketEventEmitter.apply(this, arguments); } @@ -494,15 +496,15 @@ function openClientConnection(_x7, _x8) { function _openClientConnection() { - _openClientConnection = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee12(channelName, readerUuid) { + _openClientConnection = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee11(channelName, readerUuid) { var pathToSocket, client; - return _regenerator["default"].wrap(function _callee12$(_context12) { + return _regenerator["default"].wrap(function _callee11$(_context11) { while (1) { - switch (_context12.prev = _context12.next) { + switch (_context11.prev = _context11.next) { case 0: pathToSocket = socketPath(channelName, readerUuid); client = new _net["default"].Socket(); - return _context12.abrupt("return", new Promise(function (res, rej) { + return _context11.abrupt("return", new Promise(function (res, rej) { client.connect(pathToSocket, function () { return res(client); }); @@ -513,10 +515,10 @@ function _openClientConnection() { case 3: case "end": - return _context12.stop(); + return _context11.stop(); } } - }, _callee12); + }, _callee11); })); return _openClientConnection.apply(this, arguments); } @@ -554,20 +556,20 @@ function getReadersUuids(_x9, _x10) { } function _getReadersUuids() { - _getReadersUuids = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee13(channelName, paths) { + _getReadersUuids = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee12(channelName, paths) { var readersPath, files; - return _regenerator["default"].wrap(function _callee13$(_context13) { + return _regenerator["default"].wrap(function _callee12$(_context12) { while (1) { - switch (_context13.prev = _context13.next) { + switch (_context12.prev = _context12.next) { case 0: paths = paths || getPaths(channelName); readersPath = paths.readers; - _context13.next = 4; + _context12.next = 4; return readdir(readersPath); case 4: - files = _context13.sent; - return _context13.abrupt("return", files.map(function (file) { + files = _context12.sent; + return _context12.abrupt("return", files.map(function (file) { return file.split('.'); }).filter(function (split) { return split[1] === 'json'; @@ -578,10 +580,10 @@ function _getReadersUuids() { case 6: case "end": - return _context13.stop(); + return _context12.stop(); } } - }, _callee13); + }, _callee12); })); return _getReadersUuids.apply(this, arguments); } @@ -591,22 +593,22 @@ function messagePath(_x11, _x12, _x13, _x14) { } function _messagePath() { - _messagePath = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee14(channelName, time, token, writerUuid) { + _messagePath = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee13(channelName, time, token, writerUuid) { var fileName, msgPath; - return _regenerator["default"].wrap(function _callee14$(_context14) { + return _regenerator["default"].wrap(function _callee13$(_context13) { while (1) { - switch (_context14.prev = _context14.next) { + switch (_context13.prev = _context13.next) { case 0: fileName = time + '_' + writerUuid + '_' + token + '.json'; msgPath = _path["default"].join(getPaths(channelName).messages, fileName); - return _context14.abrupt("return", msgPath); + return _context13.abrupt("return", msgPath); case 3: case "end": - return _context14.stop(); + return _context13.stop(); } } - }, _callee14); + }, _callee13); })); return _messagePath.apply(this, arguments); } @@ -616,20 +618,20 @@ function getAllMessages(_x15, _x16) { } function _getAllMessages() { - _getAllMessages = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee15(channelName, paths) { + _getAllMessages = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee14(channelName, paths) { var messagesPath, files; - return _regenerator["default"].wrap(function _callee15$(_context15) { + return _regenerator["default"].wrap(function _callee14$(_context14) { while (1) { - switch (_context15.prev = _context15.next) { + switch (_context14.prev = _context14.next) { case 0: paths = paths || getPaths(channelName); messagesPath = paths.messages; - _context15.next = 4; + _context14.next = 4; return readdir(messagesPath); case 4: - files = _context15.sent; - return _context15.abrupt("return", files.map(function (file) { + files = _context14.sent; + return _context14.abrupt("return", files.map(function (file) { var fileName = file.split('.')[0]; var split = fileName.split('_'); return { @@ -642,10 +644,10 @@ function _getAllMessages() { case 6: case "end": - return _context15.stop(); + return _context14.stop(); } } - }, _callee15); + }, _callee14); })); return _getAllMessages.apply(this, arguments); } @@ -671,14 +673,14 @@ function cleanOldMessages(_x17, _x18) { } function _cleanOldMessages() { - _cleanOldMessages = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee16(messageObjects, ttl) { + _cleanOldMessages = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee15(messageObjects, ttl) { var olderThen; - return _regenerator["default"].wrap(function _callee16$(_context16) { + return _regenerator["default"].wrap(function _callee15$(_context15) { while (1) { - switch (_context16.prev = _context16.next) { + switch (_context15.prev = _context15.next) { case 0: olderThen = Date.now() - ttl; - _context16.next = 3; + _context15.next = 3; return Promise.all(messageObjects.filter(function (obj) { return obj.time / 1000 < olderThen; }).map(function (obj) { @@ -689,10 +691,10 @@ function _cleanOldMessages() { case 3: case "end": - return _context16.stop(); + return _context15.stop(); } } - }, _callee16); + }, _callee15); })); return _cleanOldMessages.apply(this, arguments); } @@ -710,23 +712,23 @@ function create(_x19) { } function _create() { - _create = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee17(channelName) { + _create = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee16(channelName) { var options, time, paths, ensureFolderExistsPromise, uuid, state, - _yield$Promise$all2, + _yield$Promise$all, socketEE, infoFilePath, - _args17 = arguments; + _args16 = arguments; - return _regenerator["default"].wrap(function _callee17$(_context17) { + return _regenerator["default"].wrap(function _callee16$(_context16) { while (1) { - switch (_context17.prev = _context17.next) { + switch (_context16.prev = _context16.next) { case 0: - options = _args17.length > 1 && _args17[1] !== undefined ? _args17[1] : {}; + options = _args16.length > 1 && _args16[1] !== undefined ? _args16[1] : {}; options = (0, _options.fillOptionsWithDefaults)(options); time = microSeconds(); paths = getPaths(channelName); @@ -740,6 +742,15 @@ function _create() { paths: paths, // contains all messages that have been emitted before emittedMessagesIds: new _obliviousSet.ObliviousSet(options.node.ttl * 2), + + /** + * Used to ensure we do not write too many files at once + * which could throw an error. + * Must always be smaller then options.node.maxParallelWrites + */ + writeFileQueue: new _pQueue["default"]({ + concurrency: options.node.maxParallelWrites + }), messagesCallbackTime: null, messagesCallback: null, // ensures we do not read messages in parrallel @@ -753,17 +764,17 @@ function _create() { }; if (!OTHER_INSTANCES[channelName]) OTHER_INSTANCES[channelName] = []; OTHER_INSTANCES[channelName].push(state); - _context17.next = 11; + _context16.next = 11; return ensureFolderExistsPromise; case 11: - _context17.next = 13; + _context16.next = 13; return Promise.all([createSocketEventEmitter(channelName, uuid, paths), createSocketInfoFile(channelName, uuid, paths), refreshReaderClients(state)]); case 13: - _yield$Promise$all2 = _context17.sent; - socketEE = _yield$Promise$all2[0]; - infoFilePath = _yield$Promise$all2[1]; + _yield$Promise$all = _context16.sent; + socketEE = _yield$Promise$all[0]; + infoFilePath = _yield$Promise$all[1]; state.socketEE = socketEE; state.infoFilePath = infoFilePath; // when new message comes in, we read it and emit it @@ -782,14 +793,14 @@ function _create() { } }); }); - return _context17.abrupt("return", state); + return _context16.abrupt("return", state); case 20: case "end": - return _context17.stop(); + return _context16.stop(); } } - }, _callee17); + }, _callee16); })); return _create.apply(this, arguments); } @@ -824,31 +835,31 @@ function handleMessagePing(_x20, _x21) { function _handleMessagePing() { - _handleMessagePing = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee18(state, msgObj) { + _handleMessagePing = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee17(state, msgObj) { var messages, useMessages; - return _regenerator["default"].wrap(function _callee18$(_context18) { + return _regenerator["default"].wrap(function _callee17$(_context17) { while (1) { - switch (_context18.prev = _context18.next) { + switch (_context17.prev = _context17.next) { case 0: if (state.messagesCallback) { - _context18.next = 2; + _context17.next = 2; break; } - return _context18.abrupt("return"); + return _context17.abrupt("return"); case 2: if (msgObj) { - _context18.next = 8; + _context17.next = 8; break; } - _context18.next = 5; + _context17.next = 5; return getAllMessages(state.channelName, state.paths); case 5: - messages = _context18.sent; - _context18.next = 9; + messages = _context17.sent; + _context17.next = 9; break; case 8: @@ -864,14 +875,14 @@ function _handleMessagePing() { // if no listener or message, so not do anything if (!(!useMessages.length || !state.messagesCallback)) { - _context18.next = 12; + _context17.next = 12; break; } - return _context18.abrupt("return"); + return _context17.abrupt("return"); case 12: - _context18.next = 14; + _context17.next = 14; return Promise.all(useMessages.map(function (msgObj) { return readMessage(msgObj).then(function (content) { return msgObj.content = content; @@ -890,10 +901,10 @@ function _handleMessagePing() { case 15: case "end": - return _context18.stop(); + return _context17.stop(); } } - }, _callee18); + }, _callee17); })); return _handleMessagePing.apply(this, arguments); } @@ -1002,68 +1013,89 @@ function refreshReaderClients(channelState) { */ -function postMessage(channelState, messageJson) { - var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson, channelState.paths); - channelState.writeBlockPromise = channelState.writeBlockPromise.then( /*#__PURE__*/(0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee3() { - var _yield$Promise$all, msgObj, pingStr, writeToReadersPromise; +function postMessage(_x24, _x25) { + return _postMessage.apply(this, arguments); +} +/** + * When multiple BroadcastChannels with the same name + * are created in a single node-process, we can access them directly and emit messages. + * This might not happen often in production + * but will speed up things when this module is used in unit-tests. + */ + - return _regenerator["default"].wrap(function _callee3$(_context3) { +function _postMessage() { + _postMessage = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee19(channelState, messageJson) { + var writePromise; + return _regenerator["default"].wrap(function _callee19$(_context19) { while (1) { - switch (_context3.prev = _context3.next) { + switch (_context19.prev = _context19.next) { case 0: - _context3.next = 2; - return new Promise(function (res) { - return setTimeout(res, 0); + writePromise = channelState.writeFileQueue.add(function () { + return writeMessage(channelState.channelName, channelState.uuid, messageJson, channelState.paths); }); + channelState.writeBlockPromise = channelState.writeBlockPromise.then( /*#__PURE__*/(0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee18() { + var _yield$Promise$all2, msgObj, pingStr, writeToReadersPromise; + + return _regenerator["default"].wrap(function _callee18$(_context18) { + while (1) { + switch (_context18.prev = _context18.next) { + case 0: + _context18.next = 2; + return new Promise(function (res) { + return setTimeout(res, 0); + }); + + case 2: + _context18.next = 4; + return Promise.all([writePromise, refreshReaderClients(channelState)]); + + case 4: + _yield$Promise$all2 = _context18.sent; + msgObj = _yield$Promise$all2[0]; + emitOverFastPath(channelState, msgObj, messageJson); + pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}|'; + writeToReadersPromise = Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) { + return client.writable; + }) // client might have closed in between + .map(function (client) { + return new Promise(function (res) { + client.write(pingStr, res); + }); + })); + /** + * clean up old messages + * to not waste resources on cleaning up, + * only if random-int matches, we clean up old messages + */ + + if ((0, _util2.randomInt)(0, 20) === 0) { + /* await */ + getAllMessages(channelState.channelName, channelState.paths).then(function (allMessages) { + return cleanOldMessages(allMessages, channelState.options.node.ttl); + }); + } - case 2: - _context3.next = 4; - return Promise.all([writePromise, refreshReaderClients(channelState)]); + return _context18.abrupt("return", writeToReadersPromise); - case 4: - _yield$Promise$all = _context3.sent; - msgObj = _yield$Promise$all[0]; - emitOverFastPath(channelState, msgObj, messageJson); - pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}|'; - writeToReadersPromise = Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) { - return client.writable; - }) // client might have closed in between - .map(function (client) { - return new Promise(function (res) { - client.write(pingStr, res); - }); - })); - /** - * clean up old messages - * to not waste resources on cleaning up, - * only if random-int matches, we clean up old messages - */ - - if ((0, _util2.randomInt)(0, 20) === 0) { - /* await */ - getAllMessages(channelState.channelName, channelState.paths).then(function (allMessages) { - return cleanOldMessages(allMessages, channelState.options.node.ttl); - }); - } - - return _context3.abrupt("return", writeToReadersPromise); + case 11: + case "end": + return _context18.stop(); + } + } + }, _callee18); + }))); + return _context19.abrupt("return", channelState.writeBlockPromise); - case 11: + case 3: case "end": - return _context3.stop(); + return _context19.stop(); } } - }, _callee3); - }))); - return channelState.writeBlockPromise; + }, _callee19); + })); + return _postMessage.apply(this, arguments); } -/** - * When multiple BroadcastChannels with the same name - * are created in a single node-process, we can access them directly and emit messages. - * This might not happen often in production - * but will speed up things when this module is used in unit-tests. - */ - function emitOverFastPath(state, msgObj, messageJson) { if (!state.options.node.useFastPath) { diff --git a/dist/es5node/options.js b/dist/es5node/options.js index 18d4d8a5..c8435340 100644 --- a/dist/es5node/options.js +++ b/dist/es5node/options.js @@ -26,6 +26,12 @@ function fillOptionsWithDefaults() { if (!options.node) options.node = {}; if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; + /** + * On linux use 'ulimit -Hn' to get the limit of open files. + * On ubuntu this was 4096 for me, so we use half of that as maxParallelWrites default. + */ + + if (!options.node.maxParallelWrites) options.node.maxParallelWrites = 2048; if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; return options; } \ No newline at end of file diff --git a/dist/esbrowser/methods/node.js b/dist/esbrowser/methods/node.js index 880bf616..3ce81b85 100644 --- a/dist/esbrowser/methods/node.js +++ b/dist/esbrowser/methods/node.js @@ -15,6 +15,7 @@ import path from 'path'; import micro from 'nano-time'; import rimraf from 'rimraf'; import isNode from 'detect-node'; +import PQueue from 'p-queue'; import { add as unloadAdd } from 'unload'; import { fillOptionsWithDefaults } from '../options.js'; import { randomInt, randomToken, PROMISE_RESOLVED_VOID } from '../util.js'; @@ -81,10 +82,10 @@ function ensureBaseFolderExists() { } function _ensureBaseFolderExists() { - _ensureBaseFolderExists = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee4() { - return _regeneratorRuntime.wrap(function _callee4$(_context4) { + _ensureBaseFolderExists = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee3() { + return _regeneratorRuntime.wrap(function _callee3$(_context3) { while (1) { - switch (_context4.prev = _context4.next) { + switch (_context3.prev = _context3.next) { case 0: if (!ENSURE_BASE_FOLDER_EXISTS_PROMISE) { ENSURE_BASE_FOLDER_EXISTS_PROMISE = mkdir(TMP_FOLDER_BASE)["catch"](function () { @@ -92,14 +93,14 @@ function _ensureBaseFolderExists() { }); } - return _context4.abrupt("return", ENSURE_BASE_FOLDER_EXISTS_PROMISE); + return _context3.abrupt("return", ENSURE_BASE_FOLDER_EXISTS_PROMISE); case 2: case "end": - return _context4.stop(); + return _context3.stop(); } } - }, _callee4); + }, _callee3); })); return _ensureBaseFolderExists.apply(this, arguments); } @@ -113,24 +114,24 @@ export function ensureFoldersExist(_x, _x2) { */ function _ensureFoldersExist() { - _ensureFoldersExist = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee5(channelName, paths) { + _ensureFoldersExist = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee4(channelName, paths) { var chmodValue; - return _regeneratorRuntime.wrap(function _callee5$(_context5) { + return _regeneratorRuntime.wrap(function _callee4$(_context4) { while (1) { - switch (_context5.prev = _context5.next) { + switch (_context4.prev = _context4.next) { case 0: paths = paths || getPaths(channelName); - _context5.next = 3; + _context4.next = 3; return ensureBaseFolderExists(); case 3: - _context5.next = 5; + _context4.next = 5; return mkdir(paths.channelBase)["catch"](function () { return null; }); case 5: - _context5.next = 7; + _context4.next = 7; return Promise.all([mkdir(paths.readers)["catch"](function () { return null; }), mkdir(paths.messages)["catch"](function () { @@ -140,17 +141,17 @@ function _ensureFoldersExist() { case 7: // set permissions so other users can use the same channel chmodValue = '777'; - _context5.next = 10; + _context4.next = 10; return Promise.all([chmod(paths.channelBase, chmodValue), chmod(paths.readers, chmodValue), chmod(paths.messages, chmodValue)])["catch"](function () { return null; }); case 10: case "end": - return _context5.stop(); + return _context4.stop(); } } - }, _callee5); + }, _callee4); })); return _ensureFoldersExist.apply(this, arguments); } @@ -160,13 +161,13 @@ export function clearNodeFolder() { } function _clearNodeFolder() { - _clearNodeFolder = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee6() { - return _regeneratorRuntime.wrap(function _callee6$(_context6) { + _clearNodeFolder = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee5() { + return _regeneratorRuntime.wrap(function _callee5$(_context5) { while (1) { - switch (_context6.prev = _context6.next) { + switch (_context5.prev = _context5.next) { case 0: if (!(!TMP_FOLDER_BASE || TMP_FOLDER_BASE === '' || TMP_FOLDER_BASE === '/')) { - _context6.next = 2; + _context5.next = 2; break; } @@ -174,19 +175,19 @@ function _clearNodeFolder() { case 2: ENSURE_BASE_FOLDER_EXISTS_PROMISE = null; - _context6.next = 5; + _context5.next = 5; return removeDir(TMP_FOLDER_BASE); case 5: ENSURE_BASE_FOLDER_EXISTS_PROMISE = null; - return _context6.abrupt("return", true); + return _context5.abrupt("return", true); case 7: case "end": - return _context6.stop(); + return _context5.stop(); } } - }, _callee6); + }, _callee5); })); return _clearNodeFolder.apply(this, arguments); } @@ -225,29 +226,29 @@ export function countChannelFolders() { } function _countChannelFolders() { - _countChannelFolders = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee7() { + _countChannelFolders = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee6() { var folders; - return _regeneratorRuntime.wrap(function _callee7$(_context7) { + return _regeneratorRuntime.wrap(function _callee6$(_context6) { while (1) { - switch (_context7.prev = _context7.next) { + switch (_context6.prev = _context6.next) { case 0: - _context7.next = 2; + _context6.next = 2; return ensureBaseFolderExists(); case 2: - _context7.next = 4; + _context6.next = 4; return readdir(TMP_FOLDER_BASE); case 4: - folders = _context7.sent; - return _context7.abrupt("return", folders.length); + folders = _context6.sent; + return _context6.abrupt("return", folders.length); case 6: case "end": - return _context7.stop(); + return _context6.stop(); } } - }, _callee7); + }, _callee6); })); return _countChannelFolders.apply(this, arguments); } @@ -262,42 +263,42 @@ function connectionError(_x3) { function _connectionError() { - _connectionError = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee8(originalError) { + _connectionError = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee7(originalError) { var count, addObj, text, newError; - return _regeneratorRuntime.wrap(function _callee8$(_context8) { + return _regeneratorRuntime.wrap(function _callee7$(_context7) { while (1) { - switch (_context8.prev = _context8.next) { + switch (_context7.prev = _context7.next) { case 0: - _context8.next = 2; + _context7.next = 2; return countChannelFolders(); case 2: - count = _context8.sent; + count = _context7.sent; if (!(count < 30)) { - _context8.next = 5; + _context7.next = 5; break; } - return _context8.abrupt("return", originalError); + return _context7.abrupt("return", originalError); case 5: addObj = {}; - Object.entries(originalError).forEach(function (_ref4) { - var k = _ref4[0], - v = _ref4[1]; + Object.entries(originalError).forEach(function (_ref3) { + var k = _ref3[0], + v = _ref3[1]; return addObj[k] = v; }); text = 'BroadcastChannel.create(): error: ' + 'This might happen if you have created to many channels, ' + 'like when you use BroadcastChannel in unit-tests.' + 'Try using BroadcastChannel.clearNodeFolder() to clear the tmp-folder before each test.' + 'See https://github.com/pubkey/broadcast-channel#clear-tmp-folder'; newError = new Error(text + ': ' + JSON.stringify(addObj, null, 2)); - return _context8.abrupt("return", newError); + return _context7.abrupt("return", newError); case 10: case "end": - return _context8.stop(); + return _context7.stop(); } } - }, _callee8); + }, _callee7); })); return _connectionError.apply(this, arguments); } @@ -307,11 +308,11 @@ export function createSocketEventEmitter(_x4, _x5, _x6) { } function _createSocketEventEmitter() { - _createSocketEventEmitter = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee11(channelName, readerUuid, paths) { + _createSocketEventEmitter = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee10(channelName, readerUuid, paths) { var pathToSocket, emitter, server; - return _regeneratorRuntime.wrap(function _callee11$(_context11) { + return _regeneratorRuntime.wrap(function _callee10$(_context10) { while (1) { - switch (_context11.prev = _context11.next) { + switch (_context10.prev = _context10.next) { case 0: pathToSocket = socketPath(channelName, readerUuid, paths); emitter = new events.EventEmitter(); @@ -321,53 +322,53 @@ function _createSocketEventEmitter() { emitter.emit('data', msg.toString()); }); }); - _context11.next = 5; + _context10.next = 5; return new Promise(function (resolve, reject) { server.on('error', /*#__PURE__*/function () { - var _ref5 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee9(err) { + var _ref4 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee8(err) { var useErr; - return _regeneratorRuntime.wrap(function _callee9$(_context9) { + return _regeneratorRuntime.wrap(function _callee8$(_context8) { while (1) { - switch (_context9.prev = _context9.next) { + switch (_context8.prev = _context8.next) { case 0: - _context9.next = 2; + _context8.next = 2; return connectionError(err); case 2: - useErr = _context9.sent; + useErr = _context8.sent; reject(useErr); case 4: case "end": - return _context9.stop(); + return _context8.stop(); } } - }, _callee9); + }, _callee8); })); - return function (_x24) { - return _ref5.apply(this, arguments); + return function (_x26) { + return _ref4.apply(this, arguments); }; }()); server.listen(pathToSocket, /*#__PURE__*/function () { - var _ref6 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee10(err, res) { + var _ref5 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee9(err, res) { var useErr; - return _regeneratorRuntime.wrap(function _callee10$(_context10) { + return _regeneratorRuntime.wrap(function _callee9$(_context9) { while (1) { - switch (_context10.prev = _context10.next) { + switch (_context9.prev = _context9.next) { case 0: if (!err) { - _context10.next = 7; + _context9.next = 7; break; } - _context10.next = 3; + _context9.next = 3; return connectionError(err); case 3: - useErr = _context10.sent; + useErr = _context9.sent; reject(useErr); - _context10.next = 8; + _context9.next = 8; break; case 7: @@ -375,20 +376,20 @@ function _createSocketEventEmitter() { case 8: case "end": - return _context10.stop(); + return _context9.stop(); } } - }, _callee10); + }, _callee9); })); - return function (_x25, _x26) { - return _ref6.apply(this, arguments); + return function (_x27, _x28) { + return _ref5.apply(this, arguments); }; }()); }); case 5: - return _context11.abrupt("return", { + return _context10.abrupt("return", { path: pathToSocket, emitter: emitter, server: server @@ -396,10 +397,10 @@ function _createSocketEventEmitter() { case 6: case "end": - return _context11.stop(); + return _context10.stop(); } } - }, _callee11); + }, _callee10); })); return _createSocketEventEmitter.apply(this, arguments); } @@ -414,15 +415,15 @@ export function openClientConnection(_x7, _x8) { */ function _openClientConnection() { - _openClientConnection = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee12(channelName, readerUuid) { + _openClientConnection = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee11(channelName, readerUuid) { var pathToSocket, client; - return _regeneratorRuntime.wrap(function _callee12$(_context12) { + return _regeneratorRuntime.wrap(function _callee11$(_context11) { while (1) { - switch (_context12.prev = _context12.next) { + switch (_context11.prev = _context11.next) { case 0: pathToSocket = socketPath(channelName, readerUuid); client = new net.Socket(); - return _context12.abrupt("return", new Promise(function (res, rej) { + return _context11.abrupt("return", new Promise(function (res, rej) { client.connect(pathToSocket, function () { return res(client); }); @@ -433,10 +434,10 @@ function _openClientConnection() { case 3: case "end": - return _context12.stop(); + return _context11.stop(); } } - }, _callee12); + }, _callee11); })); return _openClientConnection.apply(this, arguments); } @@ -471,20 +472,20 @@ export function getReadersUuids(_x9, _x10) { } function _getReadersUuids() { - _getReadersUuids = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee13(channelName, paths) { + _getReadersUuids = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee12(channelName, paths) { var readersPath, files; - return _regeneratorRuntime.wrap(function _callee13$(_context13) { + return _regeneratorRuntime.wrap(function _callee12$(_context12) { while (1) { - switch (_context13.prev = _context13.next) { + switch (_context12.prev = _context12.next) { case 0: paths = paths || getPaths(channelName); readersPath = paths.readers; - _context13.next = 4; + _context12.next = 4; return readdir(readersPath); case 4: - files = _context13.sent; - return _context13.abrupt("return", files.map(function (file) { + files = _context12.sent; + return _context12.abrupt("return", files.map(function (file) { return file.split('.'); }).filter(function (split) { return split[1] === 'json'; @@ -495,10 +496,10 @@ function _getReadersUuids() { case 6: case "end": - return _context13.stop(); + return _context12.stop(); } } - }, _callee13); + }, _callee12); })); return _getReadersUuids.apply(this, arguments); } @@ -508,22 +509,22 @@ export function messagePath(_x11, _x12, _x13, _x14) { } function _messagePath() { - _messagePath = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee14(channelName, time, token, writerUuid) { + _messagePath = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee13(channelName, time, token, writerUuid) { var fileName, msgPath; - return _regeneratorRuntime.wrap(function _callee14$(_context14) { + return _regeneratorRuntime.wrap(function _callee13$(_context13) { while (1) { - switch (_context14.prev = _context14.next) { + switch (_context13.prev = _context13.next) { case 0: fileName = time + '_' + writerUuid + '_' + token + '.json'; msgPath = path.join(getPaths(channelName).messages, fileName); - return _context14.abrupt("return", msgPath); + return _context13.abrupt("return", msgPath); case 3: case "end": - return _context14.stop(); + return _context13.stop(); } } - }, _callee14); + }, _callee13); })); return _messagePath.apply(this, arguments); } @@ -533,20 +534,20 @@ export function getAllMessages(_x15, _x16) { } function _getAllMessages() { - _getAllMessages = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee15(channelName, paths) { + _getAllMessages = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee14(channelName, paths) { var messagesPath, files; - return _regeneratorRuntime.wrap(function _callee15$(_context15) { + return _regeneratorRuntime.wrap(function _callee14$(_context14) { while (1) { - switch (_context15.prev = _context15.next) { + switch (_context14.prev = _context14.next) { case 0: paths = paths || getPaths(channelName); messagesPath = paths.messages; - _context15.next = 4; + _context14.next = 4; return readdir(messagesPath); case 4: - files = _context15.sent; - return _context15.abrupt("return", files.map(function (file) { + files = _context14.sent; + return _context14.abrupt("return", files.map(function (file) { var fileName = file.split('.')[0]; var split = fileName.split('_'); return { @@ -559,10 +560,10 @@ function _getAllMessages() { case 6: case "end": - return _context15.stop(); + return _context14.stop(); } } - }, _callee15); + }, _callee14); })); return _getAllMessages.apply(this, arguments); } @@ -586,14 +587,14 @@ export function cleanOldMessages(_x17, _x18) { } function _cleanOldMessages() { - _cleanOldMessages = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee16(messageObjects, ttl) { + _cleanOldMessages = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee15(messageObjects, ttl) { var olderThen; - return _regeneratorRuntime.wrap(function _callee16$(_context16) { + return _regeneratorRuntime.wrap(function _callee15$(_context15) { while (1) { - switch (_context16.prev = _context16.next) { + switch (_context15.prev = _context15.next) { case 0: olderThen = Date.now() - ttl; - _context16.next = 3; + _context15.next = 3; return Promise.all(messageObjects.filter(function (obj) { return obj.time / 1000 < olderThen; }).map(function (obj) { @@ -604,10 +605,10 @@ function _cleanOldMessages() { case 3: case "end": - return _context16.stop(); + return _context15.stop(); } } - }, _callee16); + }, _callee15); })); return _cleanOldMessages.apply(this, arguments); } @@ -623,23 +624,23 @@ export function create(_x19) { } function _create() { - _create = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee17(channelName) { + _create = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee16(channelName) { var options, time, paths, ensureFolderExistsPromise, uuid, state, - _yield$Promise$all2, + _yield$Promise$all, socketEE, infoFilePath, - _args17 = arguments; + _args16 = arguments; - return _regeneratorRuntime.wrap(function _callee17$(_context17) { + return _regeneratorRuntime.wrap(function _callee16$(_context16) { while (1) { - switch (_context17.prev = _context17.next) { + switch (_context16.prev = _context16.next) { case 0: - options = _args17.length > 1 && _args17[1] !== undefined ? _args17[1] : {}; + options = _args16.length > 1 && _args16[1] !== undefined ? _args16[1] : {}; options = fillOptionsWithDefaults(options); time = microSeconds(); paths = getPaths(channelName); @@ -653,6 +654,15 @@ function _create() { paths: paths, // contains all messages that have been emitted before emittedMessagesIds: new ObliviousSet(options.node.ttl * 2), + + /** + * Used to ensure we do not write too many files at once + * which could throw an error. + * Must always be smaller then options.node.maxParallelWrites + */ + writeFileQueue: new PQueue({ + concurrency: options.node.maxParallelWrites + }), messagesCallbackTime: null, messagesCallback: null, // ensures we do not read messages in parrallel @@ -666,17 +676,17 @@ function _create() { }; if (!OTHER_INSTANCES[channelName]) OTHER_INSTANCES[channelName] = []; OTHER_INSTANCES[channelName].push(state); - _context17.next = 11; + _context16.next = 11; return ensureFolderExistsPromise; case 11: - _context17.next = 13; + _context16.next = 13; return Promise.all([createSocketEventEmitter(channelName, uuid, paths), createSocketInfoFile(channelName, uuid, paths), refreshReaderClients(state)]); case 13: - _yield$Promise$all2 = _context17.sent; - socketEE = _yield$Promise$all2[0]; - infoFilePath = _yield$Promise$all2[1]; + _yield$Promise$all = _context16.sent; + socketEE = _yield$Promise$all[0]; + infoFilePath = _yield$Promise$all[1]; state.socketEE = socketEE; state.infoFilePath = infoFilePath; // when new message comes in, we read it and emit it @@ -695,14 +705,14 @@ function _create() { } }); }); - return _context17.abrupt("return", state); + return _context16.abrupt("return", state); case 20: case "end": - return _context17.stop(); + return _context16.stop(); } } - }, _callee17); + }, _callee16); })); return _create.apply(this, arguments); } @@ -735,31 +745,31 @@ export function handleMessagePing(_x20, _x21) { */ function _handleMessagePing() { - _handleMessagePing = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee18(state, msgObj) { + _handleMessagePing = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee17(state, msgObj) { var messages, useMessages; - return _regeneratorRuntime.wrap(function _callee18$(_context18) { + return _regeneratorRuntime.wrap(function _callee17$(_context17) { while (1) { - switch (_context18.prev = _context18.next) { + switch (_context17.prev = _context17.next) { case 0: if (state.messagesCallback) { - _context18.next = 2; + _context17.next = 2; break; } - return _context18.abrupt("return"); + return _context17.abrupt("return"); case 2: if (msgObj) { - _context18.next = 8; + _context17.next = 8; break; } - _context18.next = 5; + _context17.next = 5; return getAllMessages(state.channelName, state.paths); case 5: - messages = _context18.sent; - _context18.next = 9; + messages = _context17.sent; + _context17.next = 9; break; case 8: @@ -775,14 +785,14 @@ function _handleMessagePing() { // if no listener or message, so not do anything if (!(!useMessages.length || !state.messagesCallback)) { - _context18.next = 12; + _context17.next = 12; break; } - return _context18.abrupt("return"); + return _context17.abrupt("return"); case 12: - _context18.next = 14; + _context17.next = 14; return Promise.all(useMessages.map(function (msgObj) { return readMessage(msgObj).then(function (content) { return msgObj.content = content; @@ -801,10 +811,10 @@ function _handleMessagePing() { case 15: case "end": - return _context18.stop(); + return _context17.stop(); } } - }, _callee18); + }, _callee17); })); return _handleMessagePing.apply(this, arguments); } @@ -912,67 +922,88 @@ export function refreshReaderClients(channelState) { * @return {Promise} */ -export function postMessage(channelState, messageJson) { - var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson, channelState.paths); - channelState.writeBlockPromise = channelState.writeBlockPromise.then( /*#__PURE__*/_asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee3() { - var _yield$Promise$all, msgObj, pingStr, writeToReadersPromise; +export function postMessage(_x24, _x25) { + return _postMessage.apply(this, arguments); +} +/** + * When multiple BroadcastChannels with the same name + * are created in a single node-process, we can access them directly and emit messages. + * This might not happen often in production + * but will speed up things when this module is used in unit-tests. + */ - return _regeneratorRuntime.wrap(function _callee3$(_context3) { +function _postMessage() { + _postMessage = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee19(channelState, messageJson) { + var writePromise; + return _regeneratorRuntime.wrap(function _callee19$(_context19) { while (1) { - switch (_context3.prev = _context3.next) { + switch (_context19.prev = _context19.next) { case 0: - _context3.next = 2; - return new Promise(function (res) { - return setTimeout(res, 0); + writePromise = channelState.writeFileQueue.add(function () { + return writeMessage(channelState.channelName, channelState.uuid, messageJson, channelState.paths); }); + channelState.writeBlockPromise = channelState.writeBlockPromise.then( /*#__PURE__*/_asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee18() { + var _yield$Promise$all2, msgObj, pingStr, writeToReadersPromise; + + return _regeneratorRuntime.wrap(function _callee18$(_context18) { + while (1) { + switch (_context18.prev = _context18.next) { + case 0: + _context18.next = 2; + return new Promise(function (res) { + return setTimeout(res, 0); + }); + + case 2: + _context18.next = 4; + return Promise.all([writePromise, refreshReaderClients(channelState)]); + + case 4: + _yield$Promise$all2 = _context18.sent; + msgObj = _yield$Promise$all2[0]; + emitOverFastPath(channelState, msgObj, messageJson); + pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}|'; + writeToReadersPromise = Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) { + return client.writable; + }) // client might have closed in between + .map(function (client) { + return new Promise(function (res) { + client.write(pingStr, res); + }); + })); + /** + * clean up old messages + * to not waste resources on cleaning up, + * only if random-int matches, we clean up old messages + */ + + if (randomInt(0, 20) === 0) { + /* await */ + getAllMessages(channelState.channelName, channelState.paths).then(function (allMessages) { + return cleanOldMessages(allMessages, channelState.options.node.ttl); + }); + } - case 2: - _context3.next = 4; - return Promise.all([writePromise, refreshReaderClients(channelState)]); - - case 4: - _yield$Promise$all = _context3.sent; - msgObj = _yield$Promise$all[0]; - emitOverFastPath(channelState, msgObj, messageJson); - pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}|'; - writeToReadersPromise = Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) { - return client.writable; - }) // client might have closed in between - .map(function (client) { - return new Promise(function (res) { - client.write(pingStr, res); - }); - })); - /** - * clean up old messages - * to not waste resources on cleaning up, - * only if random-int matches, we clean up old messages - */ - - if (randomInt(0, 20) === 0) { - /* await */ - getAllMessages(channelState.channelName, channelState.paths).then(function (allMessages) { - return cleanOldMessages(allMessages, channelState.options.node.ttl); - }); - } + return _context18.abrupt("return", writeToReadersPromise); - return _context3.abrupt("return", writeToReadersPromise); + case 11: + case "end": + return _context18.stop(); + } + } + }, _callee18); + }))); + return _context19.abrupt("return", channelState.writeBlockPromise); - case 11: + case 3: case "end": - return _context3.stop(); + return _context19.stop(); } } - }, _callee3); - }))); - return channelState.writeBlockPromise; + }, _callee19); + })); + return _postMessage.apply(this, arguments); } -/** - * When multiple BroadcastChannels with the same name - * are created in a single node-process, we can access them directly and emit messages. - * This might not happen often in production - * but will speed up things when this module is used in unit-tests. - */ export function emitOverFastPath(state, msgObj, messageJson) { if (!state.options.node.useFastPath) { diff --git a/dist/esbrowser/options.js b/dist/esbrowser/options.js index d5354bee..fbcc5244 100644 --- a/dist/esbrowser/options.js +++ b/dist/esbrowser/options.js @@ -19,6 +19,12 @@ export function fillOptionsWithDefaults() { if (!options.node) options.node = {}; if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; + /** + * On linux use 'ulimit -Hn' to get the limit of open files. + * On ubuntu this was 4096 for me, so we use half of that as maxParallelWrites default. + */ + + if (!options.node.maxParallelWrites) options.node.maxParallelWrites = 2048; if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; return options; } \ No newline at end of file diff --git a/dist/esnode/methods/node.js b/dist/esnode/methods/node.js index 880bf616..3ce81b85 100644 --- a/dist/esnode/methods/node.js +++ b/dist/esnode/methods/node.js @@ -15,6 +15,7 @@ import path from 'path'; import micro from 'nano-time'; import rimraf from 'rimraf'; import isNode from 'detect-node'; +import PQueue from 'p-queue'; import { add as unloadAdd } from 'unload'; import { fillOptionsWithDefaults } from '../options.js'; import { randomInt, randomToken, PROMISE_RESOLVED_VOID } from '../util.js'; @@ -81,10 +82,10 @@ function ensureBaseFolderExists() { } function _ensureBaseFolderExists() { - _ensureBaseFolderExists = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee4() { - return _regeneratorRuntime.wrap(function _callee4$(_context4) { + _ensureBaseFolderExists = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee3() { + return _regeneratorRuntime.wrap(function _callee3$(_context3) { while (1) { - switch (_context4.prev = _context4.next) { + switch (_context3.prev = _context3.next) { case 0: if (!ENSURE_BASE_FOLDER_EXISTS_PROMISE) { ENSURE_BASE_FOLDER_EXISTS_PROMISE = mkdir(TMP_FOLDER_BASE)["catch"](function () { @@ -92,14 +93,14 @@ function _ensureBaseFolderExists() { }); } - return _context4.abrupt("return", ENSURE_BASE_FOLDER_EXISTS_PROMISE); + return _context3.abrupt("return", ENSURE_BASE_FOLDER_EXISTS_PROMISE); case 2: case "end": - return _context4.stop(); + return _context3.stop(); } } - }, _callee4); + }, _callee3); })); return _ensureBaseFolderExists.apply(this, arguments); } @@ -113,24 +114,24 @@ export function ensureFoldersExist(_x, _x2) { */ function _ensureFoldersExist() { - _ensureFoldersExist = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee5(channelName, paths) { + _ensureFoldersExist = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee4(channelName, paths) { var chmodValue; - return _regeneratorRuntime.wrap(function _callee5$(_context5) { + return _regeneratorRuntime.wrap(function _callee4$(_context4) { while (1) { - switch (_context5.prev = _context5.next) { + switch (_context4.prev = _context4.next) { case 0: paths = paths || getPaths(channelName); - _context5.next = 3; + _context4.next = 3; return ensureBaseFolderExists(); case 3: - _context5.next = 5; + _context4.next = 5; return mkdir(paths.channelBase)["catch"](function () { return null; }); case 5: - _context5.next = 7; + _context4.next = 7; return Promise.all([mkdir(paths.readers)["catch"](function () { return null; }), mkdir(paths.messages)["catch"](function () { @@ -140,17 +141,17 @@ function _ensureFoldersExist() { case 7: // set permissions so other users can use the same channel chmodValue = '777'; - _context5.next = 10; + _context4.next = 10; return Promise.all([chmod(paths.channelBase, chmodValue), chmod(paths.readers, chmodValue), chmod(paths.messages, chmodValue)])["catch"](function () { return null; }); case 10: case "end": - return _context5.stop(); + return _context4.stop(); } } - }, _callee5); + }, _callee4); })); return _ensureFoldersExist.apply(this, arguments); } @@ -160,13 +161,13 @@ export function clearNodeFolder() { } function _clearNodeFolder() { - _clearNodeFolder = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee6() { - return _regeneratorRuntime.wrap(function _callee6$(_context6) { + _clearNodeFolder = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee5() { + return _regeneratorRuntime.wrap(function _callee5$(_context5) { while (1) { - switch (_context6.prev = _context6.next) { + switch (_context5.prev = _context5.next) { case 0: if (!(!TMP_FOLDER_BASE || TMP_FOLDER_BASE === '' || TMP_FOLDER_BASE === '/')) { - _context6.next = 2; + _context5.next = 2; break; } @@ -174,19 +175,19 @@ function _clearNodeFolder() { case 2: ENSURE_BASE_FOLDER_EXISTS_PROMISE = null; - _context6.next = 5; + _context5.next = 5; return removeDir(TMP_FOLDER_BASE); case 5: ENSURE_BASE_FOLDER_EXISTS_PROMISE = null; - return _context6.abrupt("return", true); + return _context5.abrupt("return", true); case 7: case "end": - return _context6.stop(); + return _context5.stop(); } } - }, _callee6); + }, _callee5); })); return _clearNodeFolder.apply(this, arguments); } @@ -225,29 +226,29 @@ export function countChannelFolders() { } function _countChannelFolders() { - _countChannelFolders = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee7() { + _countChannelFolders = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee6() { var folders; - return _regeneratorRuntime.wrap(function _callee7$(_context7) { + return _regeneratorRuntime.wrap(function _callee6$(_context6) { while (1) { - switch (_context7.prev = _context7.next) { + switch (_context6.prev = _context6.next) { case 0: - _context7.next = 2; + _context6.next = 2; return ensureBaseFolderExists(); case 2: - _context7.next = 4; + _context6.next = 4; return readdir(TMP_FOLDER_BASE); case 4: - folders = _context7.sent; - return _context7.abrupt("return", folders.length); + folders = _context6.sent; + return _context6.abrupt("return", folders.length); case 6: case "end": - return _context7.stop(); + return _context6.stop(); } } - }, _callee7); + }, _callee6); })); return _countChannelFolders.apply(this, arguments); } @@ -262,42 +263,42 @@ function connectionError(_x3) { function _connectionError() { - _connectionError = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee8(originalError) { + _connectionError = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee7(originalError) { var count, addObj, text, newError; - return _regeneratorRuntime.wrap(function _callee8$(_context8) { + return _regeneratorRuntime.wrap(function _callee7$(_context7) { while (1) { - switch (_context8.prev = _context8.next) { + switch (_context7.prev = _context7.next) { case 0: - _context8.next = 2; + _context7.next = 2; return countChannelFolders(); case 2: - count = _context8.sent; + count = _context7.sent; if (!(count < 30)) { - _context8.next = 5; + _context7.next = 5; break; } - return _context8.abrupt("return", originalError); + return _context7.abrupt("return", originalError); case 5: addObj = {}; - Object.entries(originalError).forEach(function (_ref4) { - var k = _ref4[0], - v = _ref4[1]; + Object.entries(originalError).forEach(function (_ref3) { + var k = _ref3[0], + v = _ref3[1]; return addObj[k] = v; }); text = 'BroadcastChannel.create(): error: ' + 'This might happen if you have created to many channels, ' + 'like when you use BroadcastChannel in unit-tests.' + 'Try using BroadcastChannel.clearNodeFolder() to clear the tmp-folder before each test.' + 'See https://github.com/pubkey/broadcast-channel#clear-tmp-folder'; newError = new Error(text + ': ' + JSON.stringify(addObj, null, 2)); - return _context8.abrupt("return", newError); + return _context7.abrupt("return", newError); case 10: case "end": - return _context8.stop(); + return _context7.stop(); } } - }, _callee8); + }, _callee7); })); return _connectionError.apply(this, arguments); } @@ -307,11 +308,11 @@ export function createSocketEventEmitter(_x4, _x5, _x6) { } function _createSocketEventEmitter() { - _createSocketEventEmitter = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee11(channelName, readerUuid, paths) { + _createSocketEventEmitter = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee10(channelName, readerUuid, paths) { var pathToSocket, emitter, server; - return _regeneratorRuntime.wrap(function _callee11$(_context11) { + return _regeneratorRuntime.wrap(function _callee10$(_context10) { while (1) { - switch (_context11.prev = _context11.next) { + switch (_context10.prev = _context10.next) { case 0: pathToSocket = socketPath(channelName, readerUuid, paths); emitter = new events.EventEmitter(); @@ -321,53 +322,53 @@ function _createSocketEventEmitter() { emitter.emit('data', msg.toString()); }); }); - _context11.next = 5; + _context10.next = 5; return new Promise(function (resolve, reject) { server.on('error', /*#__PURE__*/function () { - var _ref5 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee9(err) { + var _ref4 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee8(err) { var useErr; - return _regeneratorRuntime.wrap(function _callee9$(_context9) { + return _regeneratorRuntime.wrap(function _callee8$(_context8) { while (1) { - switch (_context9.prev = _context9.next) { + switch (_context8.prev = _context8.next) { case 0: - _context9.next = 2; + _context8.next = 2; return connectionError(err); case 2: - useErr = _context9.sent; + useErr = _context8.sent; reject(useErr); case 4: case "end": - return _context9.stop(); + return _context8.stop(); } } - }, _callee9); + }, _callee8); })); - return function (_x24) { - return _ref5.apply(this, arguments); + return function (_x26) { + return _ref4.apply(this, arguments); }; }()); server.listen(pathToSocket, /*#__PURE__*/function () { - var _ref6 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee10(err, res) { + var _ref5 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee9(err, res) { var useErr; - return _regeneratorRuntime.wrap(function _callee10$(_context10) { + return _regeneratorRuntime.wrap(function _callee9$(_context9) { while (1) { - switch (_context10.prev = _context10.next) { + switch (_context9.prev = _context9.next) { case 0: if (!err) { - _context10.next = 7; + _context9.next = 7; break; } - _context10.next = 3; + _context9.next = 3; return connectionError(err); case 3: - useErr = _context10.sent; + useErr = _context9.sent; reject(useErr); - _context10.next = 8; + _context9.next = 8; break; case 7: @@ -375,20 +376,20 @@ function _createSocketEventEmitter() { case 8: case "end": - return _context10.stop(); + return _context9.stop(); } } - }, _callee10); + }, _callee9); })); - return function (_x25, _x26) { - return _ref6.apply(this, arguments); + return function (_x27, _x28) { + return _ref5.apply(this, arguments); }; }()); }); case 5: - return _context11.abrupt("return", { + return _context10.abrupt("return", { path: pathToSocket, emitter: emitter, server: server @@ -396,10 +397,10 @@ function _createSocketEventEmitter() { case 6: case "end": - return _context11.stop(); + return _context10.stop(); } } - }, _callee11); + }, _callee10); })); return _createSocketEventEmitter.apply(this, arguments); } @@ -414,15 +415,15 @@ export function openClientConnection(_x7, _x8) { */ function _openClientConnection() { - _openClientConnection = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee12(channelName, readerUuid) { + _openClientConnection = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee11(channelName, readerUuid) { var pathToSocket, client; - return _regeneratorRuntime.wrap(function _callee12$(_context12) { + return _regeneratorRuntime.wrap(function _callee11$(_context11) { while (1) { - switch (_context12.prev = _context12.next) { + switch (_context11.prev = _context11.next) { case 0: pathToSocket = socketPath(channelName, readerUuid); client = new net.Socket(); - return _context12.abrupt("return", new Promise(function (res, rej) { + return _context11.abrupt("return", new Promise(function (res, rej) { client.connect(pathToSocket, function () { return res(client); }); @@ -433,10 +434,10 @@ function _openClientConnection() { case 3: case "end": - return _context12.stop(); + return _context11.stop(); } } - }, _callee12); + }, _callee11); })); return _openClientConnection.apply(this, arguments); } @@ -471,20 +472,20 @@ export function getReadersUuids(_x9, _x10) { } function _getReadersUuids() { - _getReadersUuids = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee13(channelName, paths) { + _getReadersUuids = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee12(channelName, paths) { var readersPath, files; - return _regeneratorRuntime.wrap(function _callee13$(_context13) { + return _regeneratorRuntime.wrap(function _callee12$(_context12) { while (1) { - switch (_context13.prev = _context13.next) { + switch (_context12.prev = _context12.next) { case 0: paths = paths || getPaths(channelName); readersPath = paths.readers; - _context13.next = 4; + _context12.next = 4; return readdir(readersPath); case 4: - files = _context13.sent; - return _context13.abrupt("return", files.map(function (file) { + files = _context12.sent; + return _context12.abrupt("return", files.map(function (file) { return file.split('.'); }).filter(function (split) { return split[1] === 'json'; @@ -495,10 +496,10 @@ function _getReadersUuids() { case 6: case "end": - return _context13.stop(); + return _context12.stop(); } } - }, _callee13); + }, _callee12); })); return _getReadersUuids.apply(this, arguments); } @@ -508,22 +509,22 @@ export function messagePath(_x11, _x12, _x13, _x14) { } function _messagePath() { - _messagePath = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee14(channelName, time, token, writerUuid) { + _messagePath = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee13(channelName, time, token, writerUuid) { var fileName, msgPath; - return _regeneratorRuntime.wrap(function _callee14$(_context14) { + return _regeneratorRuntime.wrap(function _callee13$(_context13) { while (1) { - switch (_context14.prev = _context14.next) { + switch (_context13.prev = _context13.next) { case 0: fileName = time + '_' + writerUuid + '_' + token + '.json'; msgPath = path.join(getPaths(channelName).messages, fileName); - return _context14.abrupt("return", msgPath); + return _context13.abrupt("return", msgPath); case 3: case "end": - return _context14.stop(); + return _context13.stop(); } } - }, _callee14); + }, _callee13); })); return _messagePath.apply(this, arguments); } @@ -533,20 +534,20 @@ export function getAllMessages(_x15, _x16) { } function _getAllMessages() { - _getAllMessages = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee15(channelName, paths) { + _getAllMessages = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee14(channelName, paths) { var messagesPath, files; - return _regeneratorRuntime.wrap(function _callee15$(_context15) { + return _regeneratorRuntime.wrap(function _callee14$(_context14) { while (1) { - switch (_context15.prev = _context15.next) { + switch (_context14.prev = _context14.next) { case 0: paths = paths || getPaths(channelName); messagesPath = paths.messages; - _context15.next = 4; + _context14.next = 4; return readdir(messagesPath); case 4: - files = _context15.sent; - return _context15.abrupt("return", files.map(function (file) { + files = _context14.sent; + return _context14.abrupt("return", files.map(function (file) { var fileName = file.split('.')[0]; var split = fileName.split('_'); return { @@ -559,10 +560,10 @@ function _getAllMessages() { case 6: case "end": - return _context15.stop(); + return _context14.stop(); } } - }, _callee15); + }, _callee14); })); return _getAllMessages.apply(this, arguments); } @@ -586,14 +587,14 @@ export function cleanOldMessages(_x17, _x18) { } function _cleanOldMessages() { - _cleanOldMessages = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee16(messageObjects, ttl) { + _cleanOldMessages = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee15(messageObjects, ttl) { var olderThen; - return _regeneratorRuntime.wrap(function _callee16$(_context16) { + return _regeneratorRuntime.wrap(function _callee15$(_context15) { while (1) { - switch (_context16.prev = _context16.next) { + switch (_context15.prev = _context15.next) { case 0: olderThen = Date.now() - ttl; - _context16.next = 3; + _context15.next = 3; return Promise.all(messageObjects.filter(function (obj) { return obj.time / 1000 < olderThen; }).map(function (obj) { @@ -604,10 +605,10 @@ function _cleanOldMessages() { case 3: case "end": - return _context16.stop(); + return _context15.stop(); } } - }, _callee16); + }, _callee15); })); return _cleanOldMessages.apply(this, arguments); } @@ -623,23 +624,23 @@ export function create(_x19) { } function _create() { - _create = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee17(channelName) { + _create = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee16(channelName) { var options, time, paths, ensureFolderExistsPromise, uuid, state, - _yield$Promise$all2, + _yield$Promise$all, socketEE, infoFilePath, - _args17 = arguments; + _args16 = arguments; - return _regeneratorRuntime.wrap(function _callee17$(_context17) { + return _regeneratorRuntime.wrap(function _callee16$(_context16) { while (1) { - switch (_context17.prev = _context17.next) { + switch (_context16.prev = _context16.next) { case 0: - options = _args17.length > 1 && _args17[1] !== undefined ? _args17[1] : {}; + options = _args16.length > 1 && _args16[1] !== undefined ? _args16[1] : {}; options = fillOptionsWithDefaults(options); time = microSeconds(); paths = getPaths(channelName); @@ -653,6 +654,15 @@ function _create() { paths: paths, // contains all messages that have been emitted before emittedMessagesIds: new ObliviousSet(options.node.ttl * 2), + + /** + * Used to ensure we do not write too many files at once + * which could throw an error. + * Must always be smaller then options.node.maxParallelWrites + */ + writeFileQueue: new PQueue({ + concurrency: options.node.maxParallelWrites + }), messagesCallbackTime: null, messagesCallback: null, // ensures we do not read messages in parrallel @@ -666,17 +676,17 @@ function _create() { }; if (!OTHER_INSTANCES[channelName]) OTHER_INSTANCES[channelName] = []; OTHER_INSTANCES[channelName].push(state); - _context17.next = 11; + _context16.next = 11; return ensureFolderExistsPromise; case 11: - _context17.next = 13; + _context16.next = 13; return Promise.all([createSocketEventEmitter(channelName, uuid, paths), createSocketInfoFile(channelName, uuid, paths), refreshReaderClients(state)]); case 13: - _yield$Promise$all2 = _context17.sent; - socketEE = _yield$Promise$all2[0]; - infoFilePath = _yield$Promise$all2[1]; + _yield$Promise$all = _context16.sent; + socketEE = _yield$Promise$all[0]; + infoFilePath = _yield$Promise$all[1]; state.socketEE = socketEE; state.infoFilePath = infoFilePath; // when new message comes in, we read it and emit it @@ -695,14 +705,14 @@ function _create() { } }); }); - return _context17.abrupt("return", state); + return _context16.abrupt("return", state); case 20: case "end": - return _context17.stop(); + return _context16.stop(); } } - }, _callee17); + }, _callee16); })); return _create.apply(this, arguments); } @@ -735,31 +745,31 @@ export function handleMessagePing(_x20, _x21) { */ function _handleMessagePing() { - _handleMessagePing = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee18(state, msgObj) { + _handleMessagePing = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee17(state, msgObj) { var messages, useMessages; - return _regeneratorRuntime.wrap(function _callee18$(_context18) { + return _regeneratorRuntime.wrap(function _callee17$(_context17) { while (1) { - switch (_context18.prev = _context18.next) { + switch (_context17.prev = _context17.next) { case 0: if (state.messagesCallback) { - _context18.next = 2; + _context17.next = 2; break; } - return _context18.abrupt("return"); + return _context17.abrupt("return"); case 2: if (msgObj) { - _context18.next = 8; + _context17.next = 8; break; } - _context18.next = 5; + _context17.next = 5; return getAllMessages(state.channelName, state.paths); case 5: - messages = _context18.sent; - _context18.next = 9; + messages = _context17.sent; + _context17.next = 9; break; case 8: @@ -775,14 +785,14 @@ function _handleMessagePing() { // if no listener or message, so not do anything if (!(!useMessages.length || !state.messagesCallback)) { - _context18.next = 12; + _context17.next = 12; break; } - return _context18.abrupt("return"); + return _context17.abrupt("return"); case 12: - _context18.next = 14; + _context17.next = 14; return Promise.all(useMessages.map(function (msgObj) { return readMessage(msgObj).then(function (content) { return msgObj.content = content; @@ -801,10 +811,10 @@ function _handleMessagePing() { case 15: case "end": - return _context18.stop(); + return _context17.stop(); } } - }, _callee18); + }, _callee17); })); return _handleMessagePing.apply(this, arguments); } @@ -912,67 +922,88 @@ export function refreshReaderClients(channelState) { * @return {Promise} */ -export function postMessage(channelState, messageJson) { - var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson, channelState.paths); - channelState.writeBlockPromise = channelState.writeBlockPromise.then( /*#__PURE__*/_asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee3() { - var _yield$Promise$all, msgObj, pingStr, writeToReadersPromise; +export function postMessage(_x24, _x25) { + return _postMessage.apply(this, arguments); +} +/** + * When multiple BroadcastChannels with the same name + * are created in a single node-process, we can access them directly and emit messages. + * This might not happen often in production + * but will speed up things when this module is used in unit-tests. + */ - return _regeneratorRuntime.wrap(function _callee3$(_context3) { +function _postMessage() { + _postMessage = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee19(channelState, messageJson) { + var writePromise; + return _regeneratorRuntime.wrap(function _callee19$(_context19) { while (1) { - switch (_context3.prev = _context3.next) { + switch (_context19.prev = _context19.next) { case 0: - _context3.next = 2; - return new Promise(function (res) { - return setTimeout(res, 0); + writePromise = channelState.writeFileQueue.add(function () { + return writeMessage(channelState.channelName, channelState.uuid, messageJson, channelState.paths); }); + channelState.writeBlockPromise = channelState.writeBlockPromise.then( /*#__PURE__*/_asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee18() { + var _yield$Promise$all2, msgObj, pingStr, writeToReadersPromise; + + return _regeneratorRuntime.wrap(function _callee18$(_context18) { + while (1) { + switch (_context18.prev = _context18.next) { + case 0: + _context18.next = 2; + return new Promise(function (res) { + return setTimeout(res, 0); + }); + + case 2: + _context18.next = 4; + return Promise.all([writePromise, refreshReaderClients(channelState)]); + + case 4: + _yield$Promise$all2 = _context18.sent; + msgObj = _yield$Promise$all2[0]; + emitOverFastPath(channelState, msgObj, messageJson); + pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}|'; + writeToReadersPromise = Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) { + return client.writable; + }) // client might have closed in between + .map(function (client) { + return new Promise(function (res) { + client.write(pingStr, res); + }); + })); + /** + * clean up old messages + * to not waste resources on cleaning up, + * only if random-int matches, we clean up old messages + */ + + if (randomInt(0, 20) === 0) { + /* await */ + getAllMessages(channelState.channelName, channelState.paths).then(function (allMessages) { + return cleanOldMessages(allMessages, channelState.options.node.ttl); + }); + } - case 2: - _context3.next = 4; - return Promise.all([writePromise, refreshReaderClients(channelState)]); - - case 4: - _yield$Promise$all = _context3.sent; - msgObj = _yield$Promise$all[0]; - emitOverFastPath(channelState, msgObj, messageJson); - pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}|'; - writeToReadersPromise = Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) { - return client.writable; - }) // client might have closed in between - .map(function (client) { - return new Promise(function (res) { - client.write(pingStr, res); - }); - })); - /** - * clean up old messages - * to not waste resources on cleaning up, - * only if random-int matches, we clean up old messages - */ - - if (randomInt(0, 20) === 0) { - /* await */ - getAllMessages(channelState.channelName, channelState.paths).then(function (allMessages) { - return cleanOldMessages(allMessages, channelState.options.node.ttl); - }); - } + return _context18.abrupt("return", writeToReadersPromise); - return _context3.abrupt("return", writeToReadersPromise); + case 11: + case "end": + return _context18.stop(); + } + } + }, _callee18); + }))); + return _context19.abrupt("return", channelState.writeBlockPromise); - case 11: + case 3: case "end": - return _context3.stop(); + return _context19.stop(); } } - }, _callee3); - }))); - return channelState.writeBlockPromise; + }, _callee19); + })); + return _postMessage.apply(this, arguments); } -/** - * When multiple BroadcastChannels with the same name - * are created in a single node-process, we can access them directly and emit messages. - * This might not happen often in production - * but will speed up things when this module is used in unit-tests. - */ export function emitOverFastPath(state, msgObj, messageJson) { if (!state.options.node.useFastPath) { diff --git a/dist/esnode/options.js b/dist/esnode/options.js index d5354bee..fbcc5244 100644 --- a/dist/esnode/options.js +++ b/dist/esnode/options.js @@ -19,6 +19,12 @@ export function fillOptionsWithDefaults() { if (!options.node) options.node = {}; if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; + /** + * On linux use 'ulimit -Hn' to get the limit of open files. + * On ubuntu this was 4096 for me, so we use half of that as maxParallelWrites default. + */ + + if (!options.node.maxParallelWrites) options.node.maxParallelWrites = 2048; if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; return options; } \ No newline at end of file diff --git a/dist/lib/browser.js b/dist/lib/browser.js index 66d38400..dad98917 100644 --- a/dist/lib/browser.js +++ b/dist/lib/browser.js @@ -1554,6 +1554,12 @@ function fillOptionsWithDefaults() { if (!options.node) options.node = {}; if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; + /** + * On linux use 'ulimit -Hn' to get the limit of open files. + * On ubuntu this was 4096 for me, so we use half of that as maxParallelWrites default. + */ + + if (!options.node.maxParallelWrites) options.node.maxParallelWrites = 2048; if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; return options; } diff --git a/dist/lib/browser.min.js b/dist/lib/browser.min.js index 13b802fa..a2292783 100644 --- a/dist/lib/browser.min.js +++ b/dist/lib/browser.min.js @@ -1 +1 @@ -!function r(o,i,s){function a(t,e){if(!i[t]){if(!o[t]){var n="function"==typeof require&&require;if(!e&&n)return n(t,!0);if(u)return u(t,!0);throw(n=new Error("Cannot find module '"+t+"'")).code="MODULE_NOT_FOUND",n}n=i[t]={exports:{}},o[t][0].call(n.exports,function(e){return a(o[t][1][e]||e)},n,n.exports,r,o,i,s)}return i[t].exports}for(var u="function"==typeof require&&require,e=0;e=e.time&&e.fn(t.data)})},n=e.method.microSeconds(),e._prepP?e._prepP.then(function(){e._iL=!0,e.method.onMessage(e._state,t,n)}):(e._iL=!0,e.method.onMessage(e._state,t,n)))}}(e)}function h(e,t,n){e._addEL[t]=e._addEL[t].filter(function(e){return e!==n}),function(e){{var t;e._iL&&!d(e)&&(e._iL=!1,t=e.method.microSeconds(),e.method.onMessage(e._state,null,t))}}(e)}(n.BroadcastChannel=a)._pubkey=!0,a.prototype={postMessage:function(e){if(this.closed)throw new Error("BroadcastChannel.postMessage(): Cannot post message after channel has closed");return l(this,"message",e)},postInternal:function(e){return l(this,"internal",e)},set onmessage(e){var t={time:this.method.microSeconds(),fn:e};h(this,"message",this._onML),e&&"function"==typeof e?(this._onML=t,f(this,"message",t)):this._onML=null},addEventListener:function(e,t){var n=this.method.microSeconds();f(this,e,{time:n,fn:t})},removeEventListener:function(e,t){var n=this._addEL[e].find(function(e){return e.fn===t});h(this,e,n)},close:function(){var e=this;if(!this.closed){s.delete(this),this.closed=!0;var t=this._prepP||o.PROMISE_RESOLVED_VOID;return this._onML=null,this._addEL.message=[],t.then(function(){return Promise.all(Array.from(e._uMP))}).then(function(){return Promise.all(e._befC.map(function(e){return e()}))}).then(function(){return e.method.close(e._state)})}},get type(){return this.method.type},get isClosed(){return this.closed}}},{"./method-chooser.js":6,"./options.js":11,"./util.js":12}],2:[function(e,t,n){"use strict";var r=e("./index.es5.js"),e=r.BroadcastChannel,r=r.createLeaderElection;window.BroadcastChannel2=e,window.createLeaderElection=r},{"./index.es5.js":3}],3:[function(e,t,n){"use strict";e=e("./index.js");t.exports={BroadcastChannel:e.BroadcastChannel,createLeaderElection:e.createLeaderElection,clearNodeFolder:e.clearNodeFolder,enforceOptions:e.enforceOptions,beLeader:e.beLeader}},{"./index.js":4}],4:[function(e,t,n){"use strict";Object.defineProperty(n,"__esModule",{value:!0}),Object.defineProperty(n,"BroadcastChannel",{enumerable:!0,get:function(){return r.BroadcastChannel}}),Object.defineProperty(n,"OPEN_BROADCAST_CHANNELS",{enumerable:!0,get:function(){return r.OPEN_BROADCAST_CHANNELS}}),Object.defineProperty(n,"beLeader",{enumerable:!0,get:function(){return o.beLeader}}),Object.defineProperty(n,"clearNodeFolder",{enumerable:!0,get:function(){return r.clearNodeFolder}}),Object.defineProperty(n,"createLeaderElection",{enumerable:!0,get:function(){return o.createLeaderElection}}),Object.defineProperty(n,"enforceOptions",{enumerable:!0,get:function(){return r.enforceOptions}});var r=e("./broadcast-channel"),o=e("./leader-election")},{"./broadcast-channel":1,"./leader-election":5}],5:[function(e,t,n){"use strict";Object.defineProperty(n,"__esModule",{value:!0}),n.beLeader=u,n.createLeaderElection=function(e,t){if(e._leaderElector)throw new Error("BroadcastChannel already has a leader-elector");t=function(e,t){e=e||{};(e=JSON.parse(JSON.stringify(e))).fallbackInterval||(e.fallbackInterval=3e3);e.responseTime||(e.responseTime=t.method.averageResponseTime(t.options));return e}(t,e);var n=new o(e,t);return e._befC.push(function(){return n.die()}),e._leaderElector=n};var s=e("./util.js"),r=e("unload"),o=function(e,t){var n=this;this.broadcastChannel=e,this._options=t,this.isLeader=!1,this.hasLeader=!1,this.isDead=!1,this.token=(0,s.randomToken)(),this._aplQ=s.PROMISE_RESOLVED_VOID,this._aplQC=0,this._unl=[],this._lstns=[],this._invs=[],this._dpL=function(){},this._dpLC=!1;function r(e){"leader"===e.context&&("death"===e.action&&(n.hasLeader=!1),"tell"===e.action&&(n.hasLeader=!0))}this.broadcastChannel.addEventListener("internal",r),this._lstns.push(r)};function a(e,t){t={context:"leader",action:t,token:e.token};return e.broadcastChannel.postInternal(t)}function u(t){t.isLeader=!0,t.hasLeader=!0;var e=(0,r.add)(function(){return t.die()});t._unl.push(e);function n(e){"leader"===e.context&&"apply"===e.action&&a(t,"tell"),"leader"!==e.context||"tell"!==e.action||t._dpLC||(t._dpLC=!0,t._dpL(),a(t,"tell"))}return t.broadcastChannel.addEventListener("internal",n),t._lstns.push(n),a(t,"tell")}o.prototype={applyOnce:function(){var i=this;if(this.isLeader)return(0,s.sleep)(0,!0);if(this.isDead)return(0,s.sleep)(0,!1);if(1i.token&&t(),"tell"===e.action&&(t(),i.hasLeader=!0))}var t,n=!1,r=new Promise(function(e){t=function(){n=!0,e()}}),o=[];return i.broadcastChannel.addEventListener("internal",e),a(i,"apply").then(function(){return Promise.race([(0,s.sleep)(i._options.responseTime/2),r.then(function(){return Promise.reject(new Error)})])}).then(function(){return a(i,"apply")}).then(function(){return Promise.race([(0,s.sleep)(i._options.responseTime/2),r.then(function(){return Promise.reject(new Error)})])}).catch(function(){}).then(function(){return i.broadcastChannel.removeEventListener("internal",e),!n&&u(i).then(function(){return!0})})}return this._aplQC=this._aplQC+1,this._aplQ=this._aplQ.then(e).then(function(){i._aplQC=i._aplQC-1}),this._aplQ.then(function(){return i.isLeader})},awaitLeadership:function(){return this._aLP||(this._aLP=function(i){if(i.isLeader)return s.PROMISE_RESOLVED_VOID;return new Promise(function(e){var t=!1;function n(){t||(t=!0,clearInterval(r),i.broadcastChannel.removeEventListener("internal",o),e(!0))}i.applyOnce().then(function(){i.isLeader&&n()});var r=setInterval(function(){console.log("applyOnce via fallbackInterval"),i.applyOnce().then(function(){i.isLeader&&n()})},i._options.fallbackInterval);i._invs.push(r);var o=function(e){"leader"===e.context&&"death"===e.action&&(i.hasLeader=!1,i.applyOnce().then(function(){i.isLeader&&n()}))};i.broadcastChannel.addEventListener("internal",o),i._lstns.push(o)})}(this)),this._aLP},set onduplicate(e){this._dpL=e},die:function(){var t=this;return this._lstns.forEach(function(e){return t.broadcastChannel.removeEventListener("internal",e)}),this._lstns=[],this._invs.forEach(function(e){return clearInterval(e)}),this._invs=[],this._unl.forEach(function(e){return e.remove()}),this._unl=[],this.isLeader&&(this.hasLeader=!1,this.isLeader=!1),this.isDead=!0,a(this,"death")}}},{"./util.js":12,unload:20}],6:[function(e,t,n){"use strict";var r=e("@babel/runtime/helpers/interopRequireDefault");e("@babel/runtime/helpers/typeof");Object.defineProperty(n,"__esModule",{value:!0}),n.chooseMethod=function(t){var e=[].concat(t.methods,u).filter(Boolean);if(t.type){if("simulate"===t.type)return s.default;var n=e.find(function(e){return e.type===t.type});if(n)return n;throw new Error("method-type "+t.type+" not found")}t.webWorkerSupport||a.isNode||(e=e.filter(function(e){return"idb"!==e.type}));e=e.find(function(e){return e.canBeUsed()});{if(e)return e;throw new Error("No useable method found in "+JSON.stringify(u.map(function(e){return e.type})))}};var o=r(e("./methods/native.js")),i=r(e("./methods/indexed-db.js")),n=r(e("./methods/localstorage.js")),s=r(e("./methods/simulate.js")),a=e("./util");var u=[o.default,i.default,n.default]},{"./methods/indexed-db.js":7,"./methods/localstorage.js":8,"./methods/native.js":9,"./methods/simulate.js":10,"./util":12,"@babel/runtime/helpers/interopRequireDefault":13,"@babel/runtime/helpers/typeof":14}],7:[function(e,t,n){"use strict";Object.defineProperty(n,"__esModule",{value:!0}),n.averageResponseTime=w,n.canBeUsed=y,n.cleanOldMessages=p,n.close=b,n.create=m,n.createDatabase=c,n.default=void 0,n.getAllMessages=function(e){var n=e.transaction(a).objectStore(a),r=[];return new Promise(function(t){n.openCursor().onsuccess=function(e){e=e.target.result;e?(r.push(e.value),e.continue()):t(r)}})},n.getIdb=u,n.getMessagesHigherThan=d,n.getOldMessages=h,n.microSeconds=void 0,n.onMessage=g,n.postMessage=_,n.removeMessageById=f,n.type=void 0,n.writeMessage=l;var o=e("../util.js"),i=e("oblivious-set"),s=e("../options"),e=o.microSeconds;n.microSeconds=e;var r="pubkey.broadcast-channel-0-",a="messages";function u(){if("undefined"!=typeof indexedDB)return indexedDB;if("undefined"!=typeof window){if(void 0!==window.mozIndexedDB)return window.mozIndexedDB;if(void 0!==window.webkitIndexedDB)return window.webkitIndexedDB;if(void 0!==window.msIndexedDB)return window.msIndexedDB}return!1}function c(e){var n=u().open(r+e,1);return n.onupgradeneeded=function(e){e.target.result.createObjectStore(a,{keyPath:"id",autoIncrement:!0})},new Promise(function(e,t){n.onerror=function(e){return t(e)},n.onsuccess=function(){e(n.result)}})}function l(e,t,n){var r={uuid:t,time:(new Date).getTime(),data:n},o=e.transaction([a],"readwrite");return new Promise(function(e,t){o.oncomplete=function(){return e()},o.onerror=function(e){return t(e)},o.objectStore(a).add(r)})}function d(e,n){var r=e.transaction(a).objectStore(a),o=[];return new Promise(function(t){(function(){try{var e=IDBKeyRange.bound(n+1,1/0);return r.openCursor(e)}catch(e){return r.openCursor()}})().onsuccess=function(e){e=e.target.result;e?e.value.idn.lastCursorId&&(n.lastCursorId=e.id),e}).filter(function(e){return t=n,(e=e).uuid!==t.uuid&&(!t.eMIs.has(e.id)&&!(e.data.time=e.time&&e.fn(t.data)})},n=e.method.microSeconds(),e._prepP?e._prepP.then(function(){e._iL=!0,e.method.onMessage(e._state,t,n)}):(e._iL=!0,e.method.onMessage(e._state,t,n)))}}(e)}function h(e,t,n){e._addEL[t]=e._addEL[t].filter(function(e){return e!==n}),function(e){{var t;e._iL&&!d(e)&&(e._iL=!1,t=e.method.microSeconds(),e.method.onMessage(e._state,null,t))}}(e)}(n.BroadcastChannel=a)._pubkey=!0,a.prototype={postMessage:function(e){if(this.closed)throw new Error("BroadcastChannel.postMessage(): Cannot post message after channel has closed");return l(this,"message",e)},postInternal:function(e){return l(this,"internal",e)},set onmessage(e){var t={time:this.method.microSeconds(),fn:e};h(this,"message",this._onML),e&&"function"==typeof e?(this._onML=t,f(this,"message",t)):this._onML=null},addEventListener:function(e,t){var n=this.method.microSeconds();f(this,e,{time:n,fn:t})},removeEventListener:function(e,t){var n=this._addEL[e].find(function(e){return e.fn===t});h(this,e,n)},close:function(){var e=this;if(!this.closed){s.delete(this),this.closed=!0;var t=this._prepP||o.PROMISE_RESOLVED_VOID;return this._onML=null,this._addEL.message=[],t.then(function(){return Promise.all(Array.from(e._uMP))}).then(function(){return Promise.all(e._befC.map(function(e){return e()}))}).then(function(){return e.method.close(e._state)})}},get type(){return this.method.type},get isClosed(){return this.closed}}},{"./method-chooser.js":6,"./options.js":11,"./util.js":12}],2:[function(e,t,n){"use strict";var r=e("./index.es5.js"),e=r.BroadcastChannel,r=r.createLeaderElection;window.BroadcastChannel2=e,window.createLeaderElection=r},{"./index.es5.js":3}],3:[function(e,t,n){"use strict";e=e("./index.js");t.exports={BroadcastChannel:e.BroadcastChannel,createLeaderElection:e.createLeaderElection,clearNodeFolder:e.clearNodeFolder,enforceOptions:e.enforceOptions,beLeader:e.beLeader}},{"./index.js":4}],4:[function(e,t,n){"use strict";Object.defineProperty(n,"__esModule",{value:!0}),Object.defineProperty(n,"BroadcastChannel",{enumerable:!0,get:function(){return r.BroadcastChannel}}),Object.defineProperty(n,"OPEN_BROADCAST_CHANNELS",{enumerable:!0,get:function(){return r.OPEN_BROADCAST_CHANNELS}}),Object.defineProperty(n,"beLeader",{enumerable:!0,get:function(){return o.beLeader}}),Object.defineProperty(n,"clearNodeFolder",{enumerable:!0,get:function(){return r.clearNodeFolder}}),Object.defineProperty(n,"createLeaderElection",{enumerable:!0,get:function(){return o.createLeaderElection}}),Object.defineProperty(n,"enforceOptions",{enumerable:!0,get:function(){return r.enforceOptions}});var r=e("./broadcast-channel"),o=e("./leader-election")},{"./broadcast-channel":1,"./leader-election":5}],5:[function(e,t,n){"use strict";Object.defineProperty(n,"__esModule",{value:!0}),n.beLeader=u,n.createLeaderElection=function(e,t){if(e._leaderElector)throw new Error("BroadcastChannel already has a leader-elector");t=function(e,t){e=e||{};(e=JSON.parse(JSON.stringify(e))).fallbackInterval||(e.fallbackInterval=3e3);e.responseTime||(e.responseTime=t.method.averageResponseTime(t.options));return e}(t,e);var n=new o(e,t);return e._befC.push(function(){return n.die()}),e._leaderElector=n};var s=e("./util.js"),r=e("unload"),o=function(e,t){var n=this;this.broadcastChannel=e,this._options=t,this.isLeader=!1,this.hasLeader=!1,this.isDead=!1,this.token=(0,s.randomToken)(),this._aplQ=s.PROMISE_RESOLVED_VOID,this._aplQC=0,this._unl=[],this._lstns=[],this._invs=[],this._dpL=function(){},this._dpLC=!1;function r(e){"leader"===e.context&&("death"===e.action&&(n.hasLeader=!1),"tell"===e.action&&(n.hasLeader=!0))}this.broadcastChannel.addEventListener("internal",r),this._lstns.push(r)};function a(e,t){t={context:"leader",action:t,token:e.token};return e.broadcastChannel.postInternal(t)}function u(t){t.isLeader=!0,t.hasLeader=!0;var e=(0,r.add)(function(){return t.die()});t._unl.push(e);function n(e){"leader"===e.context&&"apply"===e.action&&a(t,"tell"),"leader"!==e.context||"tell"!==e.action||t._dpLC||(t._dpLC=!0,t._dpL(),a(t,"tell"))}return t.broadcastChannel.addEventListener("internal",n),t._lstns.push(n),a(t,"tell")}o.prototype={applyOnce:function(){var i=this;if(this.isLeader)return(0,s.sleep)(0,!0);if(this.isDead)return(0,s.sleep)(0,!1);if(1i.token&&t(),"tell"===e.action&&(t(),i.hasLeader=!0))}var t,n=!1,r=new Promise(function(e){t=function(){n=!0,e()}}),o=[];return i.broadcastChannel.addEventListener("internal",e),a(i,"apply").then(function(){return Promise.race([(0,s.sleep)(i._options.responseTime/2),r.then(function(){return Promise.reject(new Error)})])}).then(function(){return a(i,"apply")}).then(function(){return Promise.race([(0,s.sleep)(i._options.responseTime/2),r.then(function(){return Promise.reject(new Error)})])}).catch(function(){}).then(function(){return i.broadcastChannel.removeEventListener("internal",e),!n&&u(i).then(function(){return!0})})}return this._aplQC=this._aplQC+1,this._aplQ=this._aplQ.then(e).then(function(){i._aplQC=i._aplQC-1}),this._aplQ.then(function(){return i.isLeader})},awaitLeadership:function(){return this._aLP||(this._aLP=function(i){if(i.isLeader)return s.PROMISE_RESOLVED_VOID;return new Promise(function(e){var t=!1;function n(){t||(t=!0,clearInterval(r),i.broadcastChannel.removeEventListener("internal",o),e(!0))}i.applyOnce().then(function(){i.isLeader&&n()});var r=setInterval(function(){console.log("applyOnce via fallbackInterval"),i.applyOnce().then(function(){i.isLeader&&n()})},i._options.fallbackInterval);i._invs.push(r);var o=function(e){"leader"===e.context&&"death"===e.action&&(i.hasLeader=!1,i.applyOnce().then(function(){i.isLeader&&n()}))};i.broadcastChannel.addEventListener("internal",o),i._lstns.push(o)})}(this)),this._aLP},set onduplicate(e){this._dpL=e},die:function(){var t=this;return this._lstns.forEach(function(e){return t.broadcastChannel.removeEventListener("internal",e)}),this._lstns=[],this._invs.forEach(function(e){return clearInterval(e)}),this._invs=[],this._unl.forEach(function(e){return e.remove()}),this._unl=[],this.isLeader&&(this.hasLeader=!1,this.isLeader=!1),this.isDead=!0,a(this,"death")}}},{"./util.js":12,unload:20}],6:[function(e,t,n){"use strict";var r=e("@babel/runtime/helpers/interopRequireDefault");e("@babel/runtime/helpers/typeof");Object.defineProperty(n,"__esModule",{value:!0}),n.chooseMethod=function(t){var e=[].concat(t.methods,u).filter(Boolean);if(t.type){if("simulate"===t.type)return s.default;var n=e.find(function(e){return e.type===t.type});if(n)return n;throw new Error("method-type "+t.type+" not found")}t.webWorkerSupport||a.isNode||(e=e.filter(function(e){return"idb"!==e.type}));e=e.find(function(e){return e.canBeUsed()});{if(e)return e;throw new Error("No useable method found in "+JSON.stringify(u.map(function(e){return e.type})))}};var o=r(e("./methods/native.js")),i=r(e("./methods/indexed-db.js")),n=r(e("./methods/localstorage.js")),s=r(e("./methods/simulate.js")),a=e("./util");var u=[o.default,i.default,n.default]},{"./methods/indexed-db.js":7,"./methods/localstorage.js":8,"./methods/native.js":9,"./methods/simulate.js":10,"./util":12,"@babel/runtime/helpers/interopRequireDefault":13,"@babel/runtime/helpers/typeof":14}],7:[function(e,t,n){"use strict";Object.defineProperty(n,"__esModule",{value:!0}),n.averageResponseTime=w,n.canBeUsed=y,n.cleanOldMessages=p,n.close=b,n.create=m,n.createDatabase=c,n.default=void 0,n.getAllMessages=function(e){var n=e.transaction(a).objectStore(a),r=[];return new Promise(function(t){n.openCursor().onsuccess=function(e){e=e.target.result;e?(r.push(e.value),e.continue()):t(r)}})},n.getIdb=u,n.getMessagesHigherThan=d,n.getOldMessages=h,n.microSeconds=void 0,n.onMessage=g,n.postMessage=_,n.removeMessageById=f,n.type=void 0,n.writeMessage=l;var o=e("../util.js"),i=e("oblivious-set"),s=e("../options"),e=o.microSeconds;n.microSeconds=e;var r="pubkey.broadcast-channel-0-",a="messages";function u(){if("undefined"!=typeof indexedDB)return indexedDB;if("undefined"!=typeof window){if(void 0!==window.mozIndexedDB)return window.mozIndexedDB;if(void 0!==window.webkitIndexedDB)return window.webkitIndexedDB;if(void 0!==window.msIndexedDB)return window.msIndexedDB}return!1}function c(e){var n=u().open(r+e,1);return n.onupgradeneeded=function(e){e.target.result.createObjectStore(a,{keyPath:"id",autoIncrement:!0})},new Promise(function(e,t){n.onerror=function(e){return t(e)},n.onsuccess=function(){e(n.result)}})}function l(e,t,n){var r={uuid:t,time:(new Date).getTime(),data:n},o=e.transaction([a],"readwrite");return new Promise(function(e,t){o.oncomplete=function(){return e()},o.onerror=function(e){return t(e)},o.objectStore(a).add(r)})}function d(e,n){var r=e.transaction(a).objectStore(a),o=[];return new Promise(function(t){(function(){try{var e=IDBKeyRange.bound(n+1,1/0);return r.openCursor(e)}catch(e){return r.openCursor()}})().onsuccess=function(e){e=e.target.result;e?e.value.idn.lastCursorId&&(n.lastCursorId=e.id),e}).filter(function(e){return t=n,(e=e).uuid!==t.uuid&&(!t.eMIs.has(e.id)&&!(e.data.time 1 && _args17[1] !== undefined ? _args17[1] : {}; + options = _args16.length > 1 && _args16[1] !== undefined ? _args16[1] : {}; options = (0, _options.fillOptionsWithDefaults)(options); time = microSeconds(); paths = getPaths(channelName); @@ -740,6 +742,15 @@ function _create() { paths: paths, // contains all messages that have been emitted before emittedMessagesIds: new _obliviousSet.ObliviousSet(options.node.ttl * 2), + + /** + * Used to ensure we do not write too many files at once + * which could throw an error. + * Must always be smaller then options.node.maxParallelWrites + */ + writeFileQueue: new _pQueue["default"]({ + concurrency: options.node.maxParallelWrites + }), messagesCallbackTime: null, messagesCallback: null, // ensures we do not read messages in parrallel @@ -753,17 +764,17 @@ function _create() { }; if (!OTHER_INSTANCES[channelName]) OTHER_INSTANCES[channelName] = []; OTHER_INSTANCES[channelName].push(state); - _context17.next = 11; + _context16.next = 11; return ensureFolderExistsPromise; case 11: - _context17.next = 13; + _context16.next = 13; return Promise.all([createSocketEventEmitter(channelName, uuid, paths), createSocketInfoFile(channelName, uuid, paths), refreshReaderClients(state)]); case 13: - _yield$Promise$all2 = _context17.sent; - socketEE = _yield$Promise$all2[0]; - infoFilePath = _yield$Promise$all2[1]; + _yield$Promise$all = _context16.sent; + socketEE = _yield$Promise$all[0]; + infoFilePath = _yield$Promise$all[1]; state.socketEE = socketEE; state.infoFilePath = infoFilePath; // when new message comes in, we read it and emit it @@ -782,14 +793,14 @@ function _create() { } }); }); - return _context17.abrupt("return", state); + return _context16.abrupt("return", state); case 20: case "end": - return _context17.stop(); + return _context16.stop(); } } - }, _callee17); + }, _callee16); })); return _create.apply(this, arguments); } @@ -824,31 +835,31 @@ function handleMessagePing(_x20, _x21) { function _handleMessagePing() { - _handleMessagePing = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee18(state, msgObj) { + _handleMessagePing = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee17(state, msgObj) { var messages, useMessages; - return _regenerator["default"].wrap(function _callee18$(_context18) { + return _regenerator["default"].wrap(function _callee17$(_context17) { while (1) { - switch (_context18.prev = _context18.next) { + switch (_context17.prev = _context17.next) { case 0: if (state.messagesCallback) { - _context18.next = 2; + _context17.next = 2; break; } - return _context18.abrupt("return"); + return _context17.abrupt("return"); case 2: if (msgObj) { - _context18.next = 8; + _context17.next = 8; break; } - _context18.next = 5; + _context17.next = 5; return getAllMessages(state.channelName, state.paths); case 5: - messages = _context18.sent; - _context18.next = 9; + messages = _context17.sent; + _context17.next = 9; break; case 8: @@ -864,14 +875,14 @@ function _handleMessagePing() { // if no listener or message, so not do anything if (!(!useMessages.length || !state.messagesCallback)) { - _context18.next = 12; + _context17.next = 12; break; } - return _context18.abrupt("return"); + return _context17.abrupt("return"); case 12: - _context18.next = 14; + _context17.next = 14; return Promise.all(useMessages.map(function (msgObj) { return readMessage(msgObj).then(function (content) { return msgObj.content = content; @@ -890,10 +901,10 @@ function _handleMessagePing() { case 15: case "end": - return _context18.stop(); + return _context17.stop(); } } - }, _callee18); + }, _callee17); })); return _handleMessagePing.apply(this, arguments); } @@ -1002,68 +1013,89 @@ function refreshReaderClients(channelState) { */ -function postMessage(channelState, messageJson) { - var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson, channelState.paths); - channelState.writeBlockPromise = channelState.writeBlockPromise.then( /*#__PURE__*/(0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee3() { - var _yield$Promise$all, msgObj, pingStr, writeToReadersPromise; +function postMessage(_x24, _x25) { + return _postMessage.apply(this, arguments); +} +/** + * When multiple BroadcastChannels with the same name + * are created in a single node-process, we can access them directly and emit messages. + * This might not happen often in production + * but will speed up things when this module is used in unit-tests. + */ + - return _regenerator["default"].wrap(function _callee3$(_context3) { +function _postMessage() { + _postMessage = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee19(channelState, messageJson) { + var writePromise; + return _regenerator["default"].wrap(function _callee19$(_context19) { while (1) { - switch (_context3.prev = _context3.next) { + switch (_context19.prev = _context19.next) { case 0: - _context3.next = 2; - return new Promise(function (res) { - return setTimeout(res, 0); + writePromise = channelState.writeFileQueue.add(function () { + return writeMessage(channelState.channelName, channelState.uuid, messageJson, channelState.paths); }); + channelState.writeBlockPromise = channelState.writeBlockPromise.then( /*#__PURE__*/(0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee18() { + var _yield$Promise$all2, msgObj, pingStr, writeToReadersPromise; + + return _regenerator["default"].wrap(function _callee18$(_context18) { + while (1) { + switch (_context18.prev = _context18.next) { + case 0: + _context18.next = 2; + return new Promise(function (res) { + return setTimeout(res, 0); + }); + + case 2: + _context18.next = 4; + return Promise.all([writePromise, refreshReaderClients(channelState)]); + + case 4: + _yield$Promise$all2 = _context18.sent; + msgObj = _yield$Promise$all2[0]; + emitOverFastPath(channelState, msgObj, messageJson); + pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}|'; + writeToReadersPromise = Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) { + return client.writable; + }) // client might have closed in between + .map(function (client) { + return new Promise(function (res) { + client.write(pingStr, res); + }); + })); + /** + * clean up old messages + * to not waste resources on cleaning up, + * only if random-int matches, we clean up old messages + */ + + if ((0, _util2.randomInt)(0, 20) === 0) { + /* await */ + getAllMessages(channelState.channelName, channelState.paths).then(function (allMessages) { + return cleanOldMessages(allMessages, channelState.options.node.ttl); + }); + } - case 2: - _context3.next = 4; - return Promise.all([writePromise, refreshReaderClients(channelState)]); + return _context18.abrupt("return", writeToReadersPromise); - case 4: - _yield$Promise$all = _context3.sent; - msgObj = _yield$Promise$all[0]; - emitOverFastPath(channelState, msgObj, messageJson); - pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}|'; - writeToReadersPromise = Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) { - return client.writable; - }) // client might have closed in between - .map(function (client) { - return new Promise(function (res) { - client.write(pingStr, res); - }); - })); - /** - * clean up old messages - * to not waste resources on cleaning up, - * only if random-int matches, we clean up old messages - */ - - if ((0, _util2.randomInt)(0, 20) === 0) { - /* await */ - getAllMessages(channelState.channelName, channelState.paths).then(function (allMessages) { - return cleanOldMessages(allMessages, channelState.options.node.ttl); - }); - } - - return _context3.abrupt("return", writeToReadersPromise); + case 11: + case "end": + return _context18.stop(); + } + } + }, _callee18); + }))); + return _context19.abrupt("return", channelState.writeBlockPromise); - case 11: + case 3: case "end": - return _context3.stop(); + return _context19.stop(); } } - }, _callee3); - }))); - return channelState.writeBlockPromise; + }, _callee19); + })); + return _postMessage.apply(this, arguments); } -/** - * When multiple BroadcastChannels with the same name - * are created in a single node-process, we can access them directly and emit messages. - * This might not happen often in production - * but will speed up things when this module is used in unit-tests. - */ - function emitOverFastPath(state, msgObj, messageJson) { if (!state.options.node.useFastPath) { diff --git a/dist/lib/options.js b/dist/lib/options.js index 18d4d8a5..c8435340 100644 --- a/dist/lib/options.js +++ b/dist/lib/options.js @@ -26,6 +26,12 @@ function fillOptionsWithDefaults() { if (!options.node) options.node = {}; if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; + /** + * On linux use 'ulimit -Hn' to get the limit of open files. + * On ubuntu this was 4096 for me, so we use half of that as maxParallelWrites default. + */ + + if (!options.node.maxParallelWrites) options.node.maxParallelWrites = 2048; if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; return options; } \ No newline at end of file diff --git a/docs/e2e.js b/docs/e2e.js index 1964ca43..42dd6400 100644 --- a/docs/e2e.js +++ b/docs/e2e.js @@ -1545,6 +1545,12 @@ function fillOptionsWithDefaults() { if (!options.node) options.node = {}; if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; + /** + * On linux use 'ulimit -Hn' to get the limit of open files. + * On ubuntu this was 4096 for me, so we use half of that as maxParallelWrites default. + */ + + if (!options.node.maxParallelWrites) options.node.maxParallelWrites = 2048; if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; return options; } diff --git a/docs/iframe.js b/docs/iframe.js index 11d0b6de..0c69a639 100644 --- a/docs/iframe.js +++ b/docs/iframe.js @@ -1545,6 +1545,12 @@ function fillOptionsWithDefaults() { if (!options.node) options.node = {}; if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; + /** + * On linux use 'ulimit -Hn' to get the limit of open files. + * On ubuntu this was 4096 for me, so we use half of that as maxParallelWrites default. + */ + + if (!options.node.maxParallelWrites) options.node.maxParallelWrites = 2048; if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; return options; } diff --git a/docs/index.js b/docs/index.js index 3dc1eb4c..c671dd63 100644 --- a/docs/index.js +++ b/docs/index.js @@ -1545,6 +1545,12 @@ function fillOptionsWithDefaults() { if (!options.node) options.node = {}; if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; + /** + * On linux use 'ulimit -Hn' to get the limit of open files. + * On ubuntu this was 4096 for me, so we use half of that as maxParallelWrites default. + */ + + if (!options.node.maxParallelWrites) options.node.maxParallelWrites = 2048; if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; return options; } diff --git a/docs/leader-iframe.js b/docs/leader-iframe.js index 6b60c269..421b5f6c 100644 --- a/docs/leader-iframe.js +++ b/docs/leader-iframe.js @@ -1545,6 +1545,12 @@ function fillOptionsWithDefaults() { if (!options.node) options.node = {}; if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; + /** + * On linux use 'ulimit -Hn' to get the limit of open files. + * On ubuntu this was 4096 for me, so we use half of that as maxParallelWrites default. + */ + + if (!options.node.maxParallelWrites) options.node.maxParallelWrites = 2048; if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; return options; } diff --git a/docs/worker.js b/docs/worker.js index 6076532b..01d17241 100644 --- a/docs/worker.js +++ b/docs/worker.js @@ -1545,6 +1545,12 @@ function fillOptionsWithDefaults() { if (!options.node) options.node = {}; if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; + /** + * On linux use 'ulimit -Hn' to get the limit of open files. + * On ubuntu this was 4096 for me, so we use half of that as maxParallelWrites default. + */ + + if (!options.node.maxParallelWrites) options.node.maxParallelWrites = 2048; if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; return options; } diff --git a/package.json b/package.json index d92de1a7..4b5014cf 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "broadcast-channel", - "version": "4.6.0", + "version": "4.7.0", "description": "A BroadcastChannel that works in New Browsers, Old Browsers, WebWorkers and NodeJs", "exports": { ".": {