Skip to content

Commit

Permalink
feat: Add async allow function support
Browse files Browse the repository at this point in the history
Enables more scenarios for vetting adding a core especially since
getting any blocks from a core is an async operation.
  • Loading branch information
lejeunerenard committed Mar 21, 2023
1 parent bb2219b commit bc644cf
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
6 changes: 4 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ export class AutobaseManager extends EventEmitter {
const inputAnnouncer = channel.addMessage({
encoding: c.array(c.string),
async onmessage (msgs, session) {
const allowedKeys = msgs.filter((msg) => self.allow(msg, 'input', session))
const allowedKeysComputed = await Promise.all(msgs.map((msg) => self.allow(msg, 'input', session)))
const allowedKeys = msgs.filter((msg, i) => allowedKeysComputed[i])
if (allowedKeys.length) {
DEBUG && console.log('[' +
b4a.toString(self.base.localOutput.key, 'hex').slice(-6) +
Expand All @@ -91,7 +92,8 @@ export class AutobaseManager extends EventEmitter {
const outputAnnouncer = channel.addMessage({
encoding: c.array(c.string),
async onmessage (msgs, session) {
const allowedKeys = msgs.filter((msg) => self.allow(msg, 'output', session))
const allowedKeysComputed = await Promise.all(msgs.map((msg) => self.allow(msg, 'output', session)))
const allowedKeys = msgs.filter((msg, i) => allowedKeysComputed[i])
if (allowedKeys.length) {
DEBUG && console.log('[' +
b4a.toString(self.base.localOutput.key, 'hex').slice(-6) +
Expand Down
72 changes: 72 additions & 0 deletions tests/basics.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,78 @@ test('full replicate', (t) => {
'baseA got baseB\'s outputs & not denied cores')
})

t.test('filters input & output cores w/ async allow function', async (t) => {
t.plan(2)
const [storeA, baseA] = await create()
const [storeB, baseB] = await create()

const falseCore = await storeA.get(Buffer.from('deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef', 'hex'))
const falseCore2 = await storeA.get(Buffer.from('feebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaed', 'hex'))
await baseA.addInput(falseCore)
await baseB.addOutput(falseCore2)

const streamA = storeA.replicate(true)
const streamB = storeB.replicate(false)

const denyList = [
falseCore.key.toString('hex'),
falseCore2.key.toString('hex')
]

async function allow (key) {
await new Promise((resolve) => setTimeout(resolve, 100))
return denyList.indexOf(key) === -1
}

const managerA = new AutobaseManager(baseA, allow, storeA.get.bind(storeA),
storeA.storage)
managerA.attachStream(streamA.noiseStream)
const managerB = new AutobaseManager(baseB, allow, storeB.get.bind(storeB),
storeB.storage)
managerB.attachStream(streamB.noiseStream)

const debounceMS = 10
const allCoresAdded = Promise.all([
new Promise((resolve, reject) => {
const addedCores = { input: 0, output: 0 }
let debounceCoreAdded = null
managerA.on('core-added', (core, destination) => {
addedCores[destination]++
if (addedCores.input > 0 && addedCores.output > 0) {
clearTimeout(debounceCoreAdded)
debounceCoreAdded = setTimeout(resolve, debounceMS)
}
})
}),
new Promise((resolve, reject) => {
const addedCores = { input: 0, output: 0 }
let debounceCoreAdded = null
managerB.on('core-added', (core, destination) => {
addedCores[destination]++
if (addedCores.input > 0 && addedCores.output > 0) {
clearTimeout(debounceCoreAdded)
debounceCoreAdded = setTimeout(resolve, debounceMS)
}
})
})
])

pipeline([
streamA,
streamB,
streamA
])

await allCoresAdded

t.deepEqual(baseB.inputs.map((core) => core.key),
[baseB.localInput, baseA.localInput].map((core) => core.key),
'baseB got baseA\'s inputs & not denied cores')
t.deepEqual(baseA.outputs.map((core) => core.key),
[baseA.localOutput, baseB.localOutput].map((core) => core.key),
'baseA got baseB\'s outputs & not denied cores')
})

t.test('removes stream on close', async (t) => {
t.plan(4)
const [storeA, baseA] = await create()
Expand Down

0 comments on commit bc644cf

Please sign in to comment.