diff --git a/index.js b/index.js index 8e611e3..51a3352 100644 --- a/index.js +++ b/index.js @@ -3,11 +3,13 @@ import c from 'compact-encoding' import b4a from 'b4a' import Hypercore from 'hypercore' import { difference } from './utils/set-operations.js' +import { EventEmitter } from 'events' const DEBUG = false -export class AutobaseManager { +export class AutobaseManager extends EventEmitter { constructor (base, allow, get, storage, opts = {}) { + super() this.base = base this.allow = allow this.get = get @@ -66,17 +68,18 @@ export class AutobaseManager { const inputAnnouncer = channel.addMessage({ encoding: c.array(c.string), async onmessage (msgs, session) { - const allowedKeys = msgs.filter((msg) => self.allow(msg, 'input', session)) + const allowedKeysComputed = await Promise.all(msgs.map((msg) => self.allow(msg, 'input', session))) + const allowedKeys = msgs.filter((msg, i) => allowedKeysComputed[i]) if (allowedKeys.length) { DEBUG && console.log('[' + - b4a.toString(self.base.localOutput.key, 'hex').slice(-6) + + (self.base.localOutput ? b4a.toString(self.base.localOutput.key, 'hex').slice(-6) : 'N/A') + '] inputs allowedKeys ', allowedKeys.map((key) => key.slice(-6))) // Check if any are new const newKeys = difference(allowedKeys, self._inputKeys) if (newKeys.size > 0) { DEBUG && console.log('[' + - b4a.toString(self.base.localOutput.key, 'hex').slice(-6) + + (self.base.localOutput ? b4a.toString(self.base.localOutput.key, 'hex').slice(-6) : 'N/A') + '] new inputs', [...newKeys].map((key) => key.slice(-6))) await self._addKeys(newKeys, 'input') @@ -89,16 +92,17 @@ export class AutobaseManager { const outputAnnouncer = channel.addMessage({ encoding: c.array(c.string), async onmessage (msgs, session) { - const allowedKeys = msgs.filter((msg) => self.allow(msg, 'output', session)) + const allowedKeysComputed = await Promise.all(msgs.map((msg) => self.allow(msg, 'output', session))) + const allowedKeys = msgs.filter((msg, i) => allowedKeysComputed[i]) if (allowedKeys.length) { DEBUG && console.log('[' + - b4a.toString(self.base.localOutput.key, 'hex').slice(-6) + + (self.base.localOutput ? b4a.toString(self.base.localOutput.key, 'hex').slice(-6) : 'N/A') + '] outputs allowedKeys ', allowedKeys.map((key) => key.slice(-6))) // Check if any are new const newKeys = difference(allowedKeys, self._outputKeys) if (newKeys.size > 0) { DEBUG && console.log('[' + - b4a.toString(self.base.localOutput.key, 'hex').slice(-6) + + (self.base.localOutput ? b4a.toString(self.base.localOutput.key, 'hex').slice(-6) : 'N/A') + '] new outputs ', [...newKeys].map((key) => key.slice(-6))) await self._addKeys(newKeys, 'output') await self.updateStorageKeys() @@ -122,7 +126,7 @@ export class AutobaseManager { const keys = this.base.inputs.map((core) => b4a.toString(core.key, 'hex')) if (keys.length) { DEBUG && console.log('[' + - b4a.toString(this.base.localOutput.key, 'hex').slice(-6) + + (this.base.localOutput ? b4a.toString(this.base.localOutput.key, 'hex').slice(-6) : 'N/A') + '] announce keys', keys.map((key) => key.slice(-6))) stream.inputAnnouncer.send(keys) } @@ -130,7 +134,7 @@ export class AutobaseManager { const outputKeys = this.base.outputs.map((core) => b4a.toString(core.key, 'hex')) if (outputKeys.length) { DEBUG && console.log('[' + - b4a.toString(this.base.localOutput.key, 'hex').slice(-6) + + (this.base.localOutput ? b4a.toString(this.base.localOutput.key, 'hex').slice(-6) : 'N/A') + '] announce outputKeys', outputKeys.map((key) => key.slice(-6))) stream.outputAnnouncer.send(outputKeys) } @@ -171,6 +175,7 @@ export class AutobaseManager { this._inputKeys.add(b4a.toString(core.key, 'hex')) await this.base.addInput(core) } + this.emit('core-added', core, destination) } } diff --git a/tests/basics.js b/tests/basics.js index e032160..854d8d6 100644 --- a/tests/basics.js +++ b/tests/basics.js @@ -197,6 +197,78 @@ test('full replicate', (t) => { 'baseA got baseB\'s outputs & not denied cores') }) + t.test('filters input & output cores w/ async allow function', async (t) => { + t.plan(2) + const [storeA, baseA] = await create() + const [storeB, baseB] = await create() + + const falseCore = await storeA.get(Buffer.from('deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef', 'hex')) + const falseCore2 = await storeA.get(Buffer.from('feebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaed', 'hex')) + await baseA.addInput(falseCore) + await baseB.addOutput(falseCore2) + + const streamA = storeA.replicate(true) + const streamB = storeB.replicate(false) + + const denyList = [ + falseCore.key.toString('hex'), + falseCore2.key.toString('hex') + ] + + async function allow (key) { + await new Promise((resolve) => setTimeout(resolve, 100)) + return denyList.indexOf(key) === -1 + } + + const managerA = new AutobaseManager(baseA, allow, storeA.get.bind(storeA), + storeA.storage) + managerA.attachStream(streamA.noiseStream) + const managerB = new AutobaseManager(baseB, allow, storeB.get.bind(storeB), + storeB.storage) + managerB.attachStream(streamB.noiseStream) + + const debounceMS = 10 + const allCoresAdded = Promise.all([ + new Promise((resolve, reject) => { + const addedCores = { input: 0, output: 0 } + let debounceCoreAdded = null + managerA.on('core-added', (core, destination) => { + addedCores[destination]++ + if (addedCores.input > 0 && addedCores.output > 0) { + clearTimeout(debounceCoreAdded) + debounceCoreAdded = setTimeout(resolve, debounceMS) + } + }) + }), + new Promise((resolve, reject) => { + const addedCores = { input: 0, output: 0 } + let debounceCoreAdded = null + managerB.on('core-added', (core, destination) => { + addedCores[destination]++ + if (addedCores.input > 0 && addedCores.output > 0) { + clearTimeout(debounceCoreAdded) + debounceCoreAdded = setTimeout(resolve, debounceMS) + } + }) + }) + ]) + + pipeline([ + streamA, + streamB, + streamA + ]) + + await allCoresAdded + + t.deepEqual(baseB.inputs.map((core) => core.key), + [baseB.localInput, baseA.localInput].map((core) => core.key), + 'baseB got baseA\'s inputs & not denied cores') + t.deepEqual(baseA.outputs.map((core) => core.key), + [baseA.localOutput, baseB.localOutput].map((core) => core.key), + 'baseA got baseB\'s outputs & not denied cores') + }) + t.test('removes stream on close', async (t) => { t.plan(4) const [storeA, baseA] = await create()