diff --git a/index.js b/index.js index 7df87b3..434992b 100644 --- a/index.js +++ b/index.js @@ -5,11 +5,16 @@ import Hypercore from 'hypercore' import { difference } from './utils/set-operations.js' export class AutobaseManager { - constructor (base, allow, get, storage) { + constructor (base, allow, get, storage, opts = {}) { this.base = base this.allow = allow this.get = get this.storage = Hypercore.defaultStorage(storage) + this.id = opts.id || b4a.from('main') + // Ensure id is a buffer for protomux + if (typeof this.id === 'string') { + this.id = b4a.from(this.id) + } this._inputKeys = new Set() this._outputKeys = new Set() @@ -47,7 +52,13 @@ export class AutobaseManager { const mux = Protomux.from(stream) - const channel = mux.createChannel({ protocol: 'autobase-manager' }) + const channel = mux.createChannel({ + protocol: 'autobase-manager', + id: this.id + }) + if (channel === null) { + throw Error('Attempted to attach to a stream with either duplicate or already closed channel. Maybe select a different `id`?') + } channel.open() const inputAnnouncer = channel.addMessage({ @@ -146,8 +157,8 @@ export class AutobaseManager { } _getStorage (file) { - const MANAGER_DIR = 'autobase-manager/' - return this.storage(MANAGER_DIR + file) + const MANAGER_DIR = ['autobase-manager', this.id.toString()].join('/') + return this.storage(MANAGER_DIR + '/' + file) } readStorageKeys () { diff --git a/readme.md b/readme.md index e384ff9..974057a 100644 --- a/readme.md +++ b/readme.md @@ -19,7 +19,9 @@ const manager = new AutobaseManager( // get(key) function to get a hypercore given a key corestore.get.bind(corestore), // Storage for managing autobase keys - corestore.storage) + corestore.storage, + // Options + { id: 'unique-id-per-autobase' }) // Wait until everything is loaded await manager.ready() @@ -35,7 +37,7 @@ swarm.on('connection', (conn) => { ## API -`const manager = new Autobase(base, allow, get, storage)` +`const manager = new Autobase(base, allow, get, storage, opts = {})` Create a new manager given an autobase, allow function, a means of getting a core and a storage for persisting keys distributed to load on start. @@ -54,6 +56,13 @@ core and a storage for persisting keys distributed to load on start. - `storage` is a directory where you want to store managed keys or alternatively you own [abstract-random-access](https://github.com/random-access-storage/abstract-random-access) instance. For example, if using a Corestore, `corestore.storage`. +- `opts` + Options include: + ``` + { + id: Buffer.from('unique-id-per-autobase') // A unique id per set of autobases. If passed a string, it will be automatically converted into a buffer. + } + ``` `await manager.ready()` diff --git a/tests/basics.js b/tests/basics.js index c516b64..e032160 100644 --- a/tests/basics.js +++ b/tests/basics.js @@ -231,4 +231,106 @@ test('full replicate', (t) => { t.deepEqual(managerA._streams, [], 'removes all streams') t.deepEqual(managerB._streams, [], 'removes all streams') }) + + t.test('manager w/ different ids dont collide', async (t) => { + const ID1 = 'id1' + const ID2 = 'id2' + + const [storeA, baseA1] = await create() + const [storeB, baseB1] = await create() + + const coreA2 = storeA.get({ name: 'my-input2' }) + const coreOutA2 = storeA.get({ name: 'my-output2' }) + const baseA2 = new Autobase({ + inputs: [coreA2], + localInput: coreA2, + outputs: [coreOutA2], + localOutput: coreOutA2, + autostart: true, + eagerUpdate: true + }) + await baseA2.ready() + + const coreB2 = storeB.get({ name: 'my-input2' }) + const coreOutB2 = storeB.get({ name: 'my-output2' }) + const baseB2 = new Autobase({ + inputs: [coreB2], + localInput: coreB2, + outputs: [coreOutB2], + localOutput: coreOutB2, + autostart: true, + eagerUpdate: true + }) + await baseB2.ready() + + const streamA = storeA.replicate(true) + const streamB = storeB.replicate(false) + + const managerA1 = new AutobaseManager(baseA1, () => true, + storeA.get.bind(storeA), storeA.storage, { id: ID1 }) + managerA1.attachStream(streamA.noiseStream) + const managerA2 = new AutobaseManager(baseA2, () => true, + storeA.get.bind(storeA), storeA.storage, { id: ID2 }) + managerA2.attachStream(streamA.noiseStream) + + const managerB1 = new AutobaseManager(baseB1, () => true, + storeB.get.bind(storeB), storeB.storage, { id: ID1 }) + managerB1.attachStream(streamB.noiseStream) + const managerB2 = new AutobaseManager(baseB2, () => true, + storeB.get.bind(storeB), storeB.storage, { id: ID2 }) + managerB2.attachStream(streamB.noiseStream) + + pipeline([ + streamA, + streamB, + streamA + ]) + + await new Promise((resolve) => { setTimeout(resolve, 100) }) + + t.deepEqual(baseB1.inputs.map((core) => core.key), + [baseB1.localInput, baseA1.localInput].map((core) => core.key), + 'baseB1 got baseA1\'s inputs') + t.deepEqual(baseA2.outputs.map((core) => core.key), + [baseA2.localOutput, baseB2.localOutput].map((core) => core.key), + 'baseA2 got baseB2\'s outputs') + t.notDeepEqual(baseA1.inputs.map((core) => core.key), + baseA2.inputs.map((core) => core.key), + 'baseA1 did not sync baseA2\'s inputs') + t.notDeepEqual(baseB1.outputs.map((core) => core.key), + baseB2.outputs.map((core) => core.key), + 'baseB1 did not sync baseB2\'s outputs') + t.end() + }) + + t.test('manager w/ same ids throws error', async (t) => { + const ID1 = 'id1' + + const [storeA, baseA1] = await create() + + const coreA2 = storeA.get({ name: 'my-input2' }) + const coreOutA2 = storeA.get({ name: 'my-output2' }) + const baseA2 = new Autobase({ + inputs: [coreA2], + localInput: coreA2, + outputs: [coreOutA2], + localOutput: coreOutA2, + autostart: true, + eagerUpdate: true + }) + await baseA2.ready() + + const streamA = storeA.replicate(true) + + const managerA1 = new AutobaseManager(baseA1, () => true, + storeA.get.bind(storeA), storeA.storage, { id: ID1 }) + managerA1.attachStream(streamA.noiseStream) + const managerA2 = new AutobaseManager(baseA2, () => true, + storeA.get.bind(storeA), storeA.storage, { id: ID1 }) + t.throws(() => managerA2.attachStream(streamA.noiseStream), + /Attempted to attach to a stream with either duplicate or already closed channel/, + 'throws error about colliding ids') + + t.end() + }) })