Skip to content

Commit

Permalink
refact: Manage across multiple attached streams
Browse files Browse the repository at this point in the history
Used sets for managing the known keys because they should be unique and
finding the difference between the known keys and new announced keys is
easy. Might want to consider basing keys entirely on the autobase
properties however. Having a separate collection of keys seems more
prone to error.

Given there are multiple streams now, `announceAll()` was added.
  • Loading branch information
lejeunerenard committed Jan 29, 2023
1 parent 4ef3d8a commit a7aa8ca
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 43 deletions.
104 changes: 77 additions & 27 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,66 +1,116 @@
import Protomux from 'protomux'
import c from 'compact-encoding'
import b4a from 'b4a'
import { difference } from './utils/set-operations.js'

export class AutobaseManager {
constructor (stream, base, allow, get) {
constructor (base, allow, get) {
this.base = base
this.allow = allow
this.get = get

this._inputKeys = new Set()
this._outputKeys = new Set()
this._streams = []

if (this.base.localInput) {
this._addKeys([this.base.localInput.key.toString('hex')], 'input')
}
if (this.base.localOutput) {
this._addKeys([this.base.localOutput.key.toString('hex')], 'output')
}
}

attachStream (stream) {
const self = this

const mux = Protomux.from(stream)

const channel = mux.createChannel({ protocol: 'autobase-manager' })
channel.open()

this.inputAnnouncer = channel.addMessage({
const inputAnnouncer = channel.addMessage({
encoding: c.array(c.string),
async onmessage (msgs, session) {
const allowedKeys = msgs.filter((msg) => allow(msg, 'input', session))
const allowedKeys = msgs.filter((msg) => self.allow(msg, 'input', session))
if (allowedKeys.length) {
for (const key of allowedKeys) {
const core = get(b4a.from(key, 'hex'))
await core.ready() // seems necessary for autobase id setup

await base.addInput(core)
// Check if any are new
const newKeys = difference(allowedKeys, self._inputKeys)
if (newKeys.size > 0) {
await self._addKeys(newKeys, 'input')
}
}
}
})

this.outputAnnouncer = channel.addMessage({
const outputAnnouncer = channel.addMessage({
encoding: c.array(c.string),
async onmessage (msgs, session) {
const allowedKeys = msgs.filter((msg) => allow(msg, 'output', session))
const allowedKeys = msgs.filter((msg) => self.allow(msg, 'output', session))
if (allowedKeys.length) {
for (const key of allowedKeys) {
const core = get(b4a.from(key, 'hex'))

// Necessary for autobase id (aka the core's id) setup
await core.ready()

// Update output to ensure up to date before adding
// Get a 'Batch is out-of-date.' error otherwise
if (base.started) {
await base.view.update()
}

await base.addOutput(core)
// Check if any are new
const newKeys = difference(allowedKeys, self._outputKeys)
if (newKeys.size > 0) {
await self._addKeys(newKeys, 'output')
}
}
}
})

if (this.base.localInput || this.base.inputs || this.base.outputs || this.base.localOutput) this.announce()
const streamRecord = { stream, inputAnnouncer, outputAnnouncer }
this._streams.push(streamRecord)
stream.once('close', () => {
this._streams.slice(this._streams.indexOf(streamRecord), 1)
})

if (this.base.localInput || this.base.inputs || this.base.outputs || this.base.localOutput) this.announce(streamRecord)
}

announce () {
async announce (stream) {
const keys = this.base.inputs.map((core) => core.key.toString('hex'))
if (keys.length) {
this.inputAnnouncer.send(keys)
// console.log('[' + this.base.localOutput.key.toString('hex').slice(-6) +
// '] announce keys', keys.map((key) => key.slice(-6)))
stream.inputAnnouncer.send(keys)
}

const outputKeys = this.base.outputs.map((core) => core.key.toString('hex'))
if (outputKeys.length) {
this.outputAnnouncer.send(outputKeys)
// console.log('[' + this.base.localOutput.key.toString('hex').slice(-6) +
// '] announce outputKeys', outputKeys.map((key) => key.slice(-6)))
stream.outputAnnouncer.send(outputKeys)
}
}

async announceAll () {
for (const stream of this._streams) {
await this.announce(stream)
}
}

async _addKeys (keys, destination) {
// Get & Ready Cores
const cores = await Promise.all(Array.from(keys).map(async (key) => {
const core = this.get(b4a.from(key, 'hex'))
// Necessary for autobase id (aka the core's id) setup
await core.ready()
return core
}))

// Add to the corresponding place in autobase
for (const core of cores) {
if (destination === 'output') {
this._outputKeys.add(core.key.toString('hex'))

// Update output to ensure up to date before adding
// Get a 'Batch is out-of-date.' error otherwise
if (this.base.started) await this.base.view.update()

await this.base.addOutput(core)
} else {
this._inputKeys.add(core.key.toString('hex'))
await this.base.addInput(core)
}
}
}
}
78 changes: 62 additions & 16 deletions tests/basics.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ test('full replicate', (t) => {
const streamA = storeA.replicate(true)
const streamB = storeB.replicate(false)

const managerA = new AutobaseManager(streamA.noiseStream, baseA,
() => true, storeA.get.bind(storeA))
const managerB = new AutobaseManager(streamB.noiseStream, baseB,
() => true, storeB.get.bind(storeB))
const managerA = new AutobaseManager(baseA, () => true,
storeA.get.bind(storeA))
managerA.attachStream(streamA.noiseStream)
const managerB = new AutobaseManager(baseB, () => true,
storeB.get.bind(storeB))
managerB.attachStream(streamB.noiseStream)

pipeline([
streamA,
Expand All @@ -62,10 +64,12 @@ test('full replicate', (t) => {
const streamA = storeA.replicate(true)
const streamB = storeB.replicate(false)

const managerA = new AutobaseManager(streamA.noiseStream, baseA,
() => true, storeA.get.bind(storeA))
const managerB = new AutobaseManager(streamB.noiseStream, baseB,
() => true, storeB.get.bind(storeB))
const managerA = new AutobaseManager(baseA, () => true,
storeA.get.bind(storeA))
managerA.attachStream(streamA.noiseStream)
const managerB = new AutobaseManager(baseB, () => true,
storeB.get.bind(storeB))
managerB.attachStream(streamB.noiseStream)

pipeline([
streamA,
Expand All @@ -90,10 +94,12 @@ test('full replicate', (t) => {
const streamA = storeA.replicate(true)
const streamB = storeB.replicate(false)

const managerA = new AutobaseManager(streamA.noiseStream, baseA,
() => true, storeA.get.bind(storeA))
const managerB = new AutobaseManager(streamB.noiseStream, baseB,
() => true, storeB.get.bind(storeB))
const managerA = new AutobaseManager(baseA, () => true,
storeA.get.bind(storeA))
managerA.attachStream(streamA.noiseStream)
const managerB = new AutobaseManager(baseB, () => true,
storeB.get.bind(storeB))
managerB.attachStream(streamB.noiseStream)

pipeline([
streamA,
Expand All @@ -107,6 +113,46 @@ test('full replicate', (t) => {
'baseB got baseA\'s localOutput & the unowned core')
})

t.test('3way replicates', async (t) => {
t.plan(1)
const [storeA, baseA] = await create()
const [storeB, baseB] = await create()
const [storeC, baseC] = await create()

const streamA = storeA.replicate(true)
const streamB = storeB.replicate(false)
const streamAforC = storeA.replicate(true)
const streamC = storeC.replicate(false)

const managerA = new AutobaseManager(baseA, () => true,
storeA.get.bind(storeA))
managerA.attachStream(streamA.noiseStream)
managerA.attachStream(streamAforC.noiseStream)
const managerB = new AutobaseManager(baseB, () => true,
storeB.get.bind(storeB))
managerB.attachStream(streamB.noiseStream)
const managerC = new AutobaseManager(baseC, () => true,
storeC.get.bind(storeB))
managerC.attachStream(streamC.noiseStream)

pipeline([
streamAforC,
streamC,
streamAforC
])

pipeline([
streamA,
streamB,
streamA
])

await new Promise((resolve) => { setTimeout(resolve, 100) })
t.deepEqual(baseB.outputs.map((core) => core.key),
[baseB.localOutput, baseA.localOutput, baseC.localOutput].map((core) => core.key),
'baseB got baseC\'s localOutput w/o direct stream')
})

t.test('filters input & output cores w/ allow function', async (t) => {
t.plan(2)
const [storeA, baseA] = await create()
Expand All @@ -129,10 +175,10 @@ test('full replicate', (t) => {
return denyList.indexOf(key) === -1
}

const managerA = new AutobaseManager(streamA.noiseStream, baseA, allow,
storeA.get.bind(storeA))
const managerB = new AutobaseManager(streamB.noiseStream, baseB, allow,
storeB.get.bind(storeB))
const managerA = new AutobaseManager(baseA, allow, storeA.get.bind(storeA))
managerA.attachStream(streamA.noiseStream)
const managerB = new AutobaseManager(baseB, allow, storeB.get.bind(storeB))
managerB.attachStream(streamB.noiseStream)

pipeline([
streamA,
Expand Down
8 changes: 8 additions & 0 deletions utils/set-operations.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// One-way set difference
export function difference (setA, setB) {
const _difference = new Set(setA)
for (const elem of setB) {
_difference.delete(elem)
}
return _difference
}

0 comments on commit a7aa8ca

Please sign in to comment.