From bb2219b536c812f51644eccafc6fb22316688e9a Mon Sep 17 00:00:00 2001 From: Sean Zellmer Date: Wed, 15 Mar 2023 16:31:09 -0500 Subject: [PATCH 1/3] feat: Add `core-added` event when a core is added to autobase This allows one to react to new cores being added by `autobase-manager` to the underlying autobase. --- index.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index 8e611e3..26bad2b 100644 --- a/index.js +++ b/index.js @@ -3,11 +3,13 @@ import c from 'compact-encoding' import b4a from 'b4a' import Hypercore from 'hypercore' import { difference } from './utils/set-operations.js' +import { EventEmitter } from 'events' const DEBUG = false -export class AutobaseManager { +export class AutobaseManager extends EventEmitter { constructor (base, allow, get, storage, opts = {}) { + super() this.base = base this.allow = allow this.get = get @@ -171,6 +173,7 @@ export class AutobaseManager { this._inputKeys.add(b4a.toString(core.key, 'hex')) await this.base.addInput(core) } + this.emit('core-added', core, destination) } } From bc644cfaaffe3e262d2a7ad256a95cc6378e3a80 Mon Sep 17 00:00:00 2001 From: Sean Zellmer Date: Tue, 21 Mar 2023 16:22:00 -0500 Subject: [PATCH 2/3] feat: Add async `allow` function support Enables more scenarios for vetting adding a core especially since getting any blocks from a core is an async operation. --- index.js | 6 +++-- tests/basics.js | 72 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/index.js b/index.js index 26bad2b..9032e03 100644 --- a/index.js +++ b/index.js @@ -68,7 +68,8 @@ export class AutobaseManager extends EventEmitter { const inputAnnouncer = channel.addMessage({ encoding: c.array(c.string), async onmessage (msgs, session) { - const allowedKeys = msgs.filter((msg) => self.allow(msg, 'input', session)) + const allowedKeysComputed = await Promise.all(msgs.map((msg) => self.allow(msg, 'input', session))) + const allowedKeys = msgs.filter((msg, i) => allowedKeysComputed[i]) if (allowedKeys.length) { DEBUG && console.log('[' + b4a.toString(self.base.localOutput.key, 'hex').slice(-6) + @@ -91,7 +92,8 @@ export class AutobaseManager extends EventEmitter { const outputAnnouncer = channel.addMessage({ encoding: c.array(c.string), async onmessage (msgs, session) { - const allowedKeys = msgs.filter((msg) => self.allow(msg, 'output', session)) + const allowedKeysComputed = await Promise.all(msgs.map((msg) => self.allow(msg, 'output', session))) + const allowedKeys = msgs.filter((msg, i) => allowedKeysComputed[i]) if (allowedKeys.length) { DEBUG && console.log('[' + b4a.toString(self.base.localOutput.key, 'hex').slice(-6) + diff --git a/tests/basics.js b/tests/basics.js index e032160..854d8d6 100644 --- a/tests/basics.js +++ b/tests/basics.js @@ -197,6 +197,78 @@ test('full replicate', (t) => { 'baseA got baseB\'s outputs & not denied cores') }) + t.test('filters input & output cores w/ async allow function', async (t) => { + t.plan(2) + const [storeA, baseA] = await create() + const [storeB, baseB] = await create() + + const falseCore = await storeA.get(Buffer.from('deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef', 'hex')) + const falseCore2 = await storeA.get(Buffer.from('feebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaed', 'hex')) + await baseA.addInput(falseCore) + await baseB.addOutput(falseCore2) + + const streamA = storeA.replicate(true) + const streamB = storeB.replicate(false) + + const denyList = [ + falseCore.key.toString('hex'), + falseCore2.key.toString('hex') + ] + + async function allow (key) { + await new Promise((resolve) => setTimeout(resolve, 100)) + return denyList.indexOf(key) === -1 + } + + const managerA = new AutobaseManager(baseA, allow, storeA.get.bind(storeA), + storeA.storage) + managerA.attachStream(streamA.noiseStream) + const managerB = new AutobaseManager(baseB, allow, storeB.get.bind(storeB), + storeB.storage) + managerB.attachStream(streamB.noiseStream) + + const debounceMS = 10 + const allCoresAdded = Promise.all([ + new Promise((resolve, reject) => { + const addedCores = { input: 0, output: 0 } + let debounceCoreAdded = null + managerA.on('core-added', (core, destination) => { + addedCores[destination]++ + if (addedCores.input > 0 && addedCores.output > 0) { + clearTimeout(debounceCoreAdded) + debounceCoreAdded = setTimeout(resolve, debounceMS) + } + }) + }), + new Promise((resolve, reject) => { + const addedCores = { input: 0, output: 0 } + let debounceCoreAdded = null + managerB.on('core-added', (core, destination) => { + addedCores[destination]++ + if (addedCores.input > 0 && addedCores.output > 0) { + clearTimeout(debounceCoreAdded) + debounceCoreAdded = setTimeout(resolve, debounceMS) + } + }) + }) + ]) + + pipeline([ + streamA, + streamB, + streamA + ]) + + await allCoresAdded + + t.deepEqual(baseB.inputs.map((core) => core.key), + [baseB.localInput, baseA.localInput].map((core) => core.key), + 'baseB got baseA\'s inputs & not denied cores') + t.deepEqual(baseA.outputs.map((core) => core.key), + [baseA.localOutput, baseB.localOutput].map((core) => core.key), + 'baseA got baseB\'s outputs & not denied cores') + }) + t.test('removes stream on close', async (t) => { t.plan(4) const [storeA, baseA] = await create() From 41c1242869af402da47aa0e15e1306af17e0faf7 Mon Sep 17 00:00:00 2001 From: Sean Zellmer Date: Tue, 21 Mar 2023 16:23:42 -0500 Subject: [PATCH 3/3] fix: Allow autobases w/o localOutput to use debug logs --- index.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/index.js b/index.js index 9032e03..51a3352 100644 --- a/index.js +++ b/index.js @@ -72,14 +72,14 @@ export class AutobaseManager extends EventEmitter { const allowedKeys = msgs.filter((msg, i) => allowedKeysComputed[i]) if (allowedKeys.length) { DEBUG && console.log('[' + - b4a.toString(self.base.localOutput.key, 'hex').slice(-6) + + (self.base.localOutput ? b4a.toString(self.base.localOutput.key, 'hex').slice(-6) : 'N/A') + '] inputs allowedKeys ', allowedKeys.map((key) => key.slice(-6))) // Check if any are new const newKeys = difference(allowedKeys, self._inputKeys) if (newKeys.size > 0) { DEBUG && console.log('[' + - b4a.toString(self.base.localOutput.key, 'hex').slice(-6) + + (self.base.localOutput ? b4a.toString(self.base.localOutput.key, 'hex').slice(-6) : 'N/A') + '] new inputs', [...newKeys].map((key) => key.slice(-6))) await self._addKeys(newKeys, 'input') @@ -96,13 +96,13 @@ export class AutobaseManager extends EventEmitter { const allowedKeys = msgs.filter((msg, i) => allowedKeysComputed[i]) if (allowedKeys.length) { DEBUG && console.log('[' + - b4a.toString(self.base.localOutput.key, 'hex').slice(-6) + + (self.base.localOutput ? b4a.toString(self.base.localOutput.key, 'hex').slice(-6) : 'N/A') + '] outputs allowedKeys ', allowedKeys.map((key) => key.slice(-6))) // Check if any are new const newKeys = difference(allowedKeys, self._outputKeys) if (newKeys.size > 0) { DEBUG && console.log('[' + - b4a.toString(self.base.localOutput.key, 'hex').slice(-6) + + (self.base.localOutput ? b4a.toString(self.base.localOutput.key, 'hex').slice(-6) : 'N/A') + '] new outputs ', [...newKeys].map((key) => key.slice(-6))) await self._addKeys(newKeys, 'output') await self.updateStorageKeys() @@ -126,7 +126,7 @@ export class AutobaseManager extends EventEmitter { const keys = this.base.inputs.map((core) => b4a.toString(core.key, 'hex')) if (keys.length) { DEBUG && console.log('[' + - b4a.toString(this.base.localOutput.key, 'hex').slice(-6) + + (this.base.localOutput ? b4a.toString(this.base.localOutput.key, 'hex').slice(-6) : 'N/A') + '] announce keys', keys.map((key) => key.slice(-6))) stream.inputAnnouncer.send(keys) } @@ -134,7 +134,7 @@ export class AutobaseManager extends EventEmitter { const outputKeys = this.base.outputs.map((core) => b4a.toString(core.key, 'hex')) if (outputKeys.length) { DEBUG && console.log('[' + - b4a.toString(this.base.localOutput.key, 'hex').slice(-6) + + (this.base.localOutput ? b4a.toString(this.base.localOutput.key, 'hex').slice(-6) : 'N/A') + '] announce outputKeys', outputKeys.map((key) => key.slice(-6))) stream.outputAnnouncer.send(outputKeys) }