diff --git a/index.js b/index.js index 26bad2b..9032e03 100644 --- a/index.js +++ b/index.js @@ -68,7 +68,8 @@ export class AutobaseManager extends EventEmitter { const inputAnnouncer = channel.addMessage({ encoding: c.array(c.string), async onmessage (msgs, session) { - const allowedKeys = msgs.filter((msg) => self.allow(msg, 'input', session)) + const allowedKeysComputed = await Promise.all(msgs.map((msg) => self.allow(msg, 'input', session))) + const allowedKeys = msgs.filter((msg, i) => allowedKeysComputed[i]) if (allowedKeys.length) { DEBUG && console.log('[' + b4a.toString(self.base.localOutput.key, 'hex').slice(-6) + @@ -91,7 +92,8 @@ export class AutobaseManager extends EventEmitter { const outputAnnouncer = channel.addMessage({ encoding: c.array(c.string), async onmessage (msgs, session) { - const allowedKeys = msgs.filter((msg) => self.allow(msg, 'output', session)) + const allowedKeysComputed = await Promise.all(msgs.map((msg) => self.allow(msg, 'output', session))) + const allowedKeys = msgs.filter((msg, i) => allowedKeysComputed[i]) if (allowedKeys.length) { DEBUG && console.log('[' + b4a.toString(self.base.localOutput.key, 'hex').slice(-6) + diff --git a/tests/basics.js b/tests/basics.js index e032160..854d8d6 100644 --- a/tests/basics.js +++ b/tests/basics.js @@ -197,6 +197,78 @@ test('full replicate', (t) => { 'baseA got baseB\'s outputs & not denied cores') }) + t.test('filters input & output cores w/ async allow function', async (t) => { + t.plan(2) + const [storeA, baseA] = await create() + const [storeB, baseB] = await create() + + const falseCore = await storeA.get(Buffer.from('deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef', 'hex')) + const falseCore2 = await storeA.get(Buffer.from('feebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaed', 'hex')) + await baseA.addInput(falseCore) + await baseB.addOutput(falseCore2) + + const streamA = storeA.replicate(true) + const streamB = storeB.replicate(false) + + const denyList = [ + falseCore.key.toString('hex'), + falseCore2.key.toString('hex') + ] + + async function allow (key) { + await new Promise((resolve) => setTimeout(resolve, 100)) + return denyList.indexOf(key) === -1 + } + + const managerA = new AutobaseManager(baseA, allow, storeA.get.bind(storeA), + storeA.storage) + managerA.attachStream(streamA.noiseStream) + const managerB = new AutobaseManager(baseB, allow, storeB.get.bind(storeB), + storeB.storage) + managerB.attachStream(streamB.noiseStream) + + const debounceMS = 10 + const allCoresAdded = Promise.all([ + new Promise((resolve, reject) => { + const addedCores = { input: 0, output: 0 } + let debounceCoreAdded = null + managerA.on('core-added', (core, destination) => { + addedCores[destination]++ + if (addedCores.input > 0 && addedCores.output > 0) { + clearTimeout(debounceCoreAdded) + debounceCoreAdded = setTimeout(resolve, debounceMS) + } + }) + }), + new Promise((resolve, reject) => { + const addedCores = { input: 0, output: 0 } + let debounceCoreAdded = null + managerB.on('core-added', (core, destination) => { + addedCores[destination]++ + if (addedCores.input > 0 && addedCores.output > 0) { + clearTimeout(debounceCoreAdded) + debounceCoreAdded = setTimeout(resolve, debounceMS) + } + }) + }) + ]) + + pipeline([ + streamA, + streamB, + streamA + ]) + + await allCoresAdded + + t.deepEqual(baseB.inputs.map((core) => core.key), + [baseB.localInput, baseA.localInput].map((core) => core.key), + 'baseB got baseA\'s inputs & not denied cores') + t.deepEqual(baseA.outputs.map((core) => core.key), + [baseA.localOutput, baseB.localOutput].map((core) => core.key), + 'baseA got baseB\'s outputs & not denied cores') + }) + t.test('removes stream on close', async (t) => { t.plan(4) const [storeA, baseA] = await create()