Skip to content

Commit

Permalink
refact: Convert to operate on a generic stream & an autobase
Browse files Browse the repository at this point in the history
Previously I wanted to target replicating all cores on a corestore
assuming namespacing would be sufficient to limit the replication scope.
But since these cores need to be specified as inputs & outputs in my use
case, it was better to focus on syncing and adding cores on the
autobase.

The `get` argument allows for any core storage / management in the
application and the generic stream allows for any replication /
transport configuration (as long as its a stream). This support now more
use cases than just corestores.

All outputs and inputs are announced at creation with the option to
re-announce however often the application wants to via `announce()`. In
most cases announcing on creation is sufficient. Changes to existing
peers can be propagated by setting a timeout to announce at regular
intervals or via a more intelligent pattern. Currently there is not a
method to detect when new inputs or outputs are added to an autobase
without setting a interval that then reads the autobase's inputs and
outputs properties and sees if they've changed.

Finally a flexible filter for adding a core was added that provides:
- the key for the core to be added
- whether it is to be added as an output or input
- the session it came from
With this cores can be blacklisted.
  • Loading branch information
lejeunerenard committed Jan 26, 2023
1 parent 2dbe707 commit 800fc63
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 32 deletions.
68 changes: 57 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,66 @@
import Protomux from 'protomux'
import c from 'compact-encoding'
import b4a from 'b4a'

export default function emitNewCores (store, cb, ...args) {
const replicationStream = store.replicate(...args)
export class AutobaseManager {
constructor (stream, base, allow, get) {
this.base = base

const mux = Protomux.from(replicationStream.noiseStream)
const mux = Protomux.from(stream)

const channel = mux.createChannel({ protocol: 'full-send' })
channel.open()
const channel = mux.createChannel({ protocol: 'autobase-manager' })
channel.open()

const message = channel.addMessage({ encoding: c.string })
message.onmessage = cb
this.inputAnnouncer = channel.addMessage({
encoding: c.array(c.string),
async onmessage (msgs, session) {
const allowedKeys = msgs.filter((msg) => allow(msg, 'input', session))
if (allowedKeys.length) {
for (const key of allowedKeys) {
const core = get(b4a.from(key, 'hex'))
await core.ready() // seems necessary for autobase id setup

store.on('core-open', (core) => {
message.send(core.key.toString('hex'))
})
await base.addInput(core)
}
}
}
})

return replicationStream
this.outputAnnouncer = channel.addMessage({
encoding: c.array(c.string),
async onmessage (msgs, session) {
const allowedKeys = msgs.filter((msg) => allow(msg, 'output', session))
if (allowedKeys.length) {
for (const key of allowedKeys) {
const core = get(b4a.from(key, 'hex'))

// Necessary for autobase id (aka the core's id) setup
await core.ready()

// Update output to ensure up to date before adding
// Get a 'Batch is out-of-date.' error otherwise
if (base.started) {
await base.view.update()
}

await base.addOutput(core)
}
}
}
})

if (this.base.localInput || this.base.inputs || this.base.outputs || this.base.localOutput) this.announce()
}

announce () {
const keys = this.base.inputs.map((core) => core.key.toString('hex'))
if (keys.length) {
this.inputAnnouncer.send(keys)
}

const outputKeys = this.base.outputs.map((core) => core.key.toString('hex'))
if (outputKeys.length) {
this.outputAnnouncer.send(outputKeys)
}
}
}
63 changes: 63 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"tape": "^5.6.3"
},
"dependencies": {
"autobase": "^1.0.0-alpha.8",
"compact-encoding": "^2.11.0",
"corestore": "^6.4.1",
"protomux": "^3.4.1"
Expand Down
116 changes: 95 additions & 21 deletions tests/basics.js
Original file line number Diff line number Diff line change
@@ -1,35 +1,109 @@
import test from 'tape'
import RAM from 'random-access-memory'
import Corestore from 'corestore'
import emitNewCores from '../index.js'
import Autobase from 'autobase'
import { AutobaseManager } from '../index.js'
import { pipeline } from 'streamx'

async function create (storage) {
const store = new Corestore(storage || RAM)
await store.ready()

const core = store.get({ name: 'my-input' })
const coreOut = store.get({ name: 'my-output' })
const base = new Autobase({
inputs: [core],
localInput: core,
outputs: [coreOut],
localOutput: coreOut,
autostart: true,
eagerUpdate: true
})

await base.ready()

return [store, base]
}

test('full replicate', (t) => {
t.test('emits message when a core is added', async (t) => {
t.test('adds localInputs between autobases', async (t) => {
t.plan(1)
const [storeA, baseA] = await create()
const [storeB, baseB] = await create()

const streamA = storeA.replicate(true)
const streamB = storeB.replicate(false)

const managerA = new AutobaseManager(streamA.noiseStream, baseA,
() => true, storeA.get.bind(storeA))
const managerB = new AutobaseManager(streamB.noiseStream, baseB,
() => true, storeB.get.bind(storeB))

pipeline([
streamA,
streamB,
streamA
])

await new Promise((resolve) => { setTimeout(resolve, 100) })
t.deepEqual(baseB.inputs.map((core) => core.key),
[baseB.localInput, baseA.localInput].map((core) => core.key),
'baseB got baseA\'s localinput')
})

const storeA = new Corestore(RAM)
const storeB = new Corestore(RAM)
t.test('adds inputs not own by either autobase but known by one', async (t) => {
t.plan(1)
const [storeA, baseA] = await create()
const [storeB, baseB] = await create()

const falseCore = await storeA.get(Buffer.from('deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef', 'hex'))
await baseA.addInput(falseCore)

const streamA = storeA.replicate(true)
const streamB = storeB.replicate(false)

const managerA = new AutobaseManager(streamA.noiseStream, baseA,
() => true, storeA.get.bind(storeA))
const managerB = new AutobaseManager(streamB.noiseStream, baseB,
() => true, storeB.get.bind(storeB))

pipeline([
streamA,
streamB,
streamA
])

await new Promise((resolve) => { setTimeout(resolve, 100) })
t.deepEqual(baseB.inputs.map((core) => core.key),
[baseB.localInput, baseA.localInput, falseCore].map((core) => core.key),
'baseB got baseA\'s localInput & the unowned ocer')
})

t.test('adds outputs not own by either autobase but known by one', async (t) => {
t.plan(1)
const [storeA, baseA] = await create()
const [storeB, baseB] = await create()

await storeA.ready()
await storeB.ready()
const falseCore = await storeA.get(Buffer.from('deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef', 'hex'))
await baseA.addOutput(falseCore)

let streamA
const streamAProm = new Promise((resolve, reject) => {
streamA = emitNewCores(storeA, (key) => {
t.is(key, expectedKey.toString('hex'))
resolve()
}, true)
})
const streamB = emitNewCores(storeB, (key) => {
t.fail('self announced')
}, false)
const streamA = storeA.replicate(true)
const streamB = storeB.replicate(false)

streamA.pipe(streamB).pipe(streamA)
const managerA = new AutobaseManager(streamA.noiseStream, baseA,
() => true, storeA.get.bind(storeA))
const managerB = new AutobaseManager(streamB.noiseStream, baseB,
() => true, storeB.get.bind(storeB))

const core = await storeB.get({ name: 'beep' })
await core.ready()
const expectedKey = core.key
pipeline([
streamA,
streamB,
streamA
])

await streamAProm
await new Promise((resolve) => { setTimeout(resolve, 100) })
t.deepEqual(baseB.outputs.map((core) => core.key),
[baseB.localOutput, baseA.localOutput, falseCore].map((core) => core.key),
'baseB got baseA\'s localOutput & the unowned core')
})
})

0 comments on commit 800fc63

Please sign in to comment.