Skip to content

Commit

Permalink
Merge branch 'core-added-event' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
lejeunerenard committed Mar 21, 2023
2 parents edc2426 + 41c1242 commit fad97fa
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 9 deletions.
23 changes: 14 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import c from 'compact-encoding'
import b4a from 'b4a'
import Hypercore from 'hypercore'
import { difference } from './utils/set-operations.js'
import { EventEmitter } from 'events'

const DEBUG = false

export class AutobaseManager {
export class AutobaseManager extends EventEmitter {
constructor (base, allow, get, storage, opts = {}) {
super()
this.base = base
this.allow = allow
this.get = get
Expand Down Expand Up @@ -66,17 +68,18 @@ export class AutobaseManager {
const inputAnnouncer = channel.addMessage({
encoding: c.array(c.string),
async onmessage (msgs, session) {
const allowedKeys = msgs.filter((msg) => self.allow(msg, 'input', session))
const allowedKeysComputed = await Promise.all(msgs.map((msg) => self.allow(msg, 'input', session)))
const allowedKeys = msgs.filter((msg, i) => allowedKeysComputed[i])
if (allowedKeys.length) {
DEBUG && console.log('[' +
b4a.toString(self.base.localOutput.key, 'hex').slice(-6) +
(self.base.localOutput ? b4a.toString(self.base.localOutput.key, 'hex').slice(-6) : 'N/A') +
'] inputs allowedKeys ', allowedKeys.map((key) => key.slice(-6)))

// Check if any are new
const newKeys = difference(allowedKeys, self._inputKeys)
if (newKeys.size > 0) {
DEBUG && console.log('[' +
b4a.toString(self.base.localOutput.key, 'hex').slice(-6) +
(self.base.localOutput ? b4a.toString(self.base.localOutput.key, 'hex').slice(-6) : 'N/A') +
'] new inputs', [...newKeys].map((key) => key.slice(-6)))

await self._addKeys(newKeys, 'input')
Expand All @@ -89,16 +92,17 @@ export class AutobaseManager {
const outputAnnouncer = channel.addMessage({
encoding: c.array(c.string),
async onmessage (msgs, session) {
const allowedKeys = msgs.filter((msg) => self.allow(msg, 'output', session))
const allowedKeysComputed = await Promise.all(msgs.map((msg) => self.allow(msg, 'output', session)))
const allowedKeys = msgs.filter((msg, i) => allowedKeysComputed[i])
if (allowedKeys.length) {
DEBUG && console.log('[' +
b4a.toString(self.base.localOutput.key, 'hex').slice(-6) +
(self.base.localOutput ? b4a.toString(self.base.localOutput.key, 'hex').slice(-6) : 'N/A') +
'] outputs allowedKeys ', allowedKeys.map((key) => key.slice(-6)))
// Check if any are new
const newKeys = difference(allowedKeys, self._outputKeys)
if (newKeys.size > 0) {
DEBUG && console.log('[' +
b4a.toString(self.base.localOutput.key, 'hex').slice(-6) +
(self.base.localOutput ? b4a.toString(self.base.localOutput.key, 'hex').slice(-6) : 'N/A') +
'] new outputs ', [...newKeys].map((key) => key.slice(-6)))
await self._addKeys(newKeys, 'output')
await self.updateStorageKeys()
Expand All @@ -122,15 +126,15 @@ export class AutobaseManager {
const keys = this.base.inputs.map((core) => b4a.toString(core.key, 'hex'))
if (keys.length) {
DEBUG && console.log('[' +
b4a.toString(this.base.localOutput.key, 'hex').slice(-6) +
(this.base.localOutput ? b4a.toString(this.base.localOutput.key, 'hex').slice(-6) : 'N/A') +
'] announce keys', keys.map((key) => key.slice(-6)))
stream.inputAnnouncer.send(keys)
}

const outputKeys = this.base.outputs.map((core) => b4a.toString(core.key, 'hex'))
if (outputKeys.length) {
DEBUG && console.log('[' +
b4a.toString(this.base.localOutput.key, 'hex').slice(-6) +
(this.base.localOutput ? b4a.toString(this.base.localOutput.key, 'hex').slice(-6) : 'N/A') +
'] announce outputKeys', outputKeys.map((key) => key.slice(-6)))
stream.outputAnnouncer.send(outputKeys)
}
Expand Down Expand Up @@ -171,6 +175,7 @@ export class AutobaseManager {
this._inputKeys.add(b4a.toString(core.key, 'hex'))
await this.base.addInput(core)
}
this.emit('core-added', core, destination)
}
}

Expand Down
72 changes: 72 additions & 0 deletions tests/basics.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,78 @@ test('full replicate', (t) => {
'baseA got baseB\'s outputs & not denied cores')
})

t.test('filters input & output cores w/ async allow function', async (t) => {
t.plan(2)
const [storeA, baseA] = await create()
const [storeB, baseB] = await create()

const falseCore = await storeA.get(Buffer.from('deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef', 'hex'))
const falseCore2 = await storeA.get(Buffer.from('feebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaedfeebdaed', 'hex'))
await baseA.addInput(falseCore)
await baseB.addOutput(falseCore2)

const streamA = storeA.replicate(true)
const streamB = storeB.replicate(false)

const denyList = [
falseCore.key.toString('hex'),
falseCore2.key.toString('hex')
]

async function allow (key) {
await new Promise((resolve) => setTimeout(resolve, 100))
return denyList.indexOf(key) === -1
}

const managerA = new AutobaseManager(baseA, allow, storeA.get.bind(storeA),
storeA.storage)
managerA.attachStream(streamA.noiseStream)
const managerB = new AutobaseManager(baseB, allow, storeB.get.bind(storeB),
storeB.storage)
managerB.attachStream(streamB.noiseStream)

const debounceMS = 10
const allCoresAdded = Promise.all([
new Promise((resolve, reject) => {
const addedCores = { input: 0, output: 0 }
let debounceCoreAdded = null
managerA.on('core-added', (core, destination) => {
addedCores[destination]++
if (addedCores.input > 0 && addedCores.output > 0) {
clearTimeout(debounceCoreAdded)
debounceCoreAdded = setTimeout(resolve, debounceMS)
}
})
}),
new Promise((resolve, reject) => {
const addedCores = { input: 0, output: 0 }
let debounceCoreAdded = null
managerB.on('core-added', (core, destination) => {
addedCores[destination]++
if (addedCores.input > 0 && addedCores.output > 0) {
clearTimeout(debounceCoreAdded)
debounceCoreAdded = setTimeout(resolve, debounceMS)
}
})
})
])

pipeline([
streamA,
streamB,
streamA
])

await allCoresAdded

t.deepEqual(baseB.inputs.map((core) => core.key),
[baseB.localInput, baseA.localInput].map((core) => core.key),
'baseB got baseA\'s inputs & not denied cores')
t.deepEqual(baseA.outputs.map((core) => core.key),
[baseA.localOutput, baseB.localOutput].map((core) => core.key),
'baseA got baseB\'s outputs & not denied cores')
})

t.test('removes stream on close', async (t) => {
t.plan(4)
const [storeA, baseA] = await create()
Expand Down

0 comments on commit fad97fa

Please sign in to comment.