Skip to content

Commit

Permalink
feat: Add id option to support multiple autobases on same stream
Browse files Browse the repository at this point in the history
Thanks again to @railgun on hypercore-protocol discord for the
suggestion.
  • Loading branch information
lejeunerenard committed Mar 12, 2023
1 parent 61f37b6 commit 9b3e119
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 6 deletions.
19 changes: 15 additions & 4 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import Hypercore from 'hypercore'
import { difference } from './utils/set-operations.js'

export class AutobaseManager {
constructor (base, allow, get, storage) {
constructor (base, allow, get, storage, opts = {}) {
this.base = base
this.allow = allow
this.get = get
this.storage = Hypercore.defaultStorage(storage)
this.id = opts.id || b4a.from('main')
// Ensure id is a buffer for protomux
if (typeof this.id === 'string') {
this.id = b4a.from(this.id)
}

this._inputKeys = new Set()
this._outputKeys = new Set()
Expand Down Expand Up @@ -47,7 +52,13 @@ export class AutobaseManager {

const mux = Protomux.from(stream)

const channel = mux.createChannel({ protocol: 'autobase-manager' })
const channel = mux.createChannel({
protocol: 'autobase-manager',
id: this.id
})
if (channel === null) {
throw Error('Attempted to attach to a stream with either duplicate or already closed channel. Maybe select a different `id`?')
}
channel.open()

const inputAnnouncer = channel.addMessage({
Expand Down Expand Up @@ -146,8 +157,8 @@ export class AutobaseManager {
}

_getStorage (file) {
const MANAGER_DIR = 'autobase-manager/'
return this.storage(MANAGER_DIR + file)
const MANAGER_DIR = ['autobase-manager', this.id.toString()].join('/')
return this.storage(MANAGER_DIR + '/' + file)
}

readStorageKeys () {
Expand Down
13 changes: 11 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ const manager = new AutobaseManager(
// get(key) function to get a hypercore given a key
corestore.get.bind(corestore),
// Storage for managing autobase keys
corestore.storage)
corestore.storage,
// Options
{ id: 'unique-id-per-autobase' })

// Wait until everything is loaded
await manager.ready()
Expand All @@ -35,7 +37,7 @@ swarm.on('connection', (conn) => {

## API

`const manager = new Autobase(base, allow, get, storage)`
`const manager = new Autobase(base, allow, get, storage, opts = {})`

Create a new manager given an autobase, allow function, a means of getting a
core and a storage for persisting keys distributed to load on start.
Expand All @@ -54,6 +56,13 @@ core and a storage for persisting keys distributed to load on start.
- `storage` is a directory where you want to store managed keys or alternatively
you own [abstract-random-access](https://github.com/random-access-storage/abstract-random-access)
instance. For example, if using a Corestore, `corestore.storage`.
- `opts`
Options include:
```
{
id: Buffer.from('unique-id-per-autobase') // A unique id per set of autobases. If passed a string, it will be automatically converted into a buffer.
}
```

`await manager.ready()`

Expand Down
102 changes: 102 additions & 0 deletions tests/basics.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,106 @@ test('full replicate', (t) => {
t.deepEqual(managerA._streams, [], 'removes all streams')
t.deepEqual(managerB._streams, [], 'removes all streams')
})

t.test('manager w/ different ids dont collide', async (t) => {
const ID1 = 'id1'
const ID2 = 'id2'

const [storeA, baseA1] = await create()
const [storeB, baseB1] = await create()

const coreA2 = storeA.get({ name: 'my-input2' })
const coreOutA2 = storeA.get({ name: 'my-output2' })
const baseA2 = new Autobase({
inputs: [coreA2],
localInput: coreA2,
outputs: [coreOutA2],
localOutput: coreOutA2,
autostart: true,
eagerUpdate: true
})
await baseA2.ready()

const coreB2 = storeB.get({ name: 'my-input2' })
const coreOutB2 = storeB.get({ name: 'my-output2' })
const baseB2 = new Autobase({
inputs: [coreB2],
localInput: coreB2,
outputs: [coreOutB2],
localOutput: coreOutB2,
autostart: true,
eagerUpdate: true
})
await baseB2.ready()

const streamA = storeA.replicate(true)
const streamB = storeB.replicate(false)

const managerA1 = new AutobaseManager(baseA1, () => true,
storeA.get.bind(storeA), storeA.storage, { id: ID1 })
managerA1.attachStream(streamA.noiseStream)
const managerA2 = new AutobaseManager(baseA2, () => true,
storeA.get.bind(storeA), storeA.storage, { id: ID2 })
managerA2.attachStream(streamA.noiseStream)

const managerB1 = new AutobaseManager(baseB1, () => true,
storeB.get.bind(storeB), storeB.storage, { id: ID1 })
managerB1.attachStream(streamB.noiseStream)
const managerB2 = new AutobaseManager(baseB2, () => true,
storeB.get.bind(storeB), storeB.storage, { id: ID2 })
managerB2.attachStream(streamB.noiseStream)

pipeline([
streamA,
streamB,
streamA
])

await new Promise((resolve) => { setTimeout(resolve, 100) })

t.deepEqual(baseB1.inputs.map((core) => core.key),
[baseB1.localInput, baseA1.localInput].map((core) => core.key),
'baseB1 got baseA1\'s inputs')
t.deepEqual(baseA2.outputs.map((core) => core.key),
[baseA2.localOutput, baseB2.localOutput].map((core) => core.key),
'baseA2 got baseB2\'s outputs')
t.notDeepEqual(baseA1.inputs.map((core) => core.key),
baseA2.inputs.map((core) => core.key),
'baseA1 did not sync baseA2\'s inputs')
t.notDeepEqual(baseB1.outputs.map((core) => core.key),
baseB2.outputs.map((core) => core.key),
'baseB1 did not sync baseB2\'s outputs')
t.end()
})

t.test('manager w/ same ids throws error', async (t) => {
const ID1 = 'id1'

const [storeA, baseA1] = await create()

const coreA2 = storeA.get({ name: 'my-input2' })
const coreOutA2 = storeA.get({ name: 'my-output2' })
const baseA2 = new Autobase({
inputs: [coreA2],
localInput: coreA2,
outputs: [coreOutA2],
localOutput: coreOutA2,
autostart: true,
eagerUpdate: true
})
await baseA2.ready()

const streamA = storeA.replicate(true)

const managerA1 = new AutobaseManager(baseA1, () => true,
storeA.get.bind(storeA), storeA.storage, { id: ID1 })
managerA1.attachStream(streamA.noiseStream)
const managerA2 = new AutobaseManager(baseA2, () => true,
storeA.get.bind(storeA), storeA.storage, { id: ID1 })
t.throws(() => managerA2.attachStream(streamA.noiseStream),
/Attempted to attach to a stream with either duplicate or already closed channel/,
'throws error about colliding ids')

t.end()
})
})

0 comments on commit 9b3e119

Please sign in to comment.