Skip to content

Commit

Permalink
feat: Persist keys in storage to load on construction
Browse files Browse the repository at this point in the history
A keys / cores manager for autobase should be able to load from
persistent storage lest outputs would be calculated on large datasets
when inputs that were previously there disappear on reboot.
  • Loading branch information
lejeunerenard committed Jan 29, 2023
1 parent a7aa8ca commit 60b7c31
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 12 deletions.
76 changes: 75 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import Protomux from 'protomux'
import c from 'compact-encoding'
import b4a from 'b4a'
import Hypercore from 'hypercore'
import { difference } from './utils/set-operations.js'

export class AutobaseManager {
constructor (base, allow, get) {
constructor (base, allow, get, storage) {
this.base = base
this.allow = allow
this.get = get
this.storage = Hypercore.defaultStorage(storage)

this._inputKeys = new Set()
this._outputKeys = new Set()
Expand All @@ -19,6 +21,13 @@ export class AutobaseManager {
if (this.base.localOutput) {
this._addKeys([this.base.localOutput.key.toString('hex')], 'output')
}

// Load storage
this._ready = this.readStorageKeys()
}

ready () {
return this._ready
}

attachStream (stream) {
Expand All @@ -38,6 +47,7 @@ export class AutobaseManager {
const newKeys = difference(allowedKeys, self._inputKeys)
if (newKeys.size > 0) {
await self._addKeys(newKeys, 'input')
await self.updateStorageKeys()
}
}
}
Expand All @@ -52,6 +62,7 @@ export class AutobaseManager {
const newKeys = difference(allowedKeys, self._outputKeys)
if (newKeys.size > 0) {
await self._addKeys(newKeys, 'output')
await self.updateStorageKeys()
}
}
}
Expand All @@ -67,6 +78,8 @@ export class AutobaseManager {
}

async announce (stream) {
await this.ready()

const keys = this.base.inputs.map((core) => core.key.toString('hex'))
if (keys.length) {
// console.log('[' + this.base.localOutput.key.toString('hex').slice(-6) +
Expand Down Expand Up @@ -113,4 +126,65 @@ export class AutobaseManager {
}
}
}

_getStorage (file) {
const MANAGER_DIR = 'autobase-manager/'
return this.storage(MANAGER_DIR + file)
}

readStorageKeys () {
return Promise.all([
this._readStorageKey('inputs', this._inputKeys),
this._readStorageKey('outputs', this._outputKeys)
])
}

_readStorageKey (file, output) {
const store = this._getStorage(file)
return new Promise((resolve, reject) => {
store.stat(async (err, stat) => {
if (err) return

const len = stat.size
for (let start = 0; start < len; start += 32) {
await new Promise((resolve2, reject) => {
store.read(start, 32, function (err, buf) {
if (err) throw err

output.add(buf.toString('hex'))
resolve2()
})
})
}

store.close()
resolve()
})
}
)
}

async updateStorageKeys () {
await this._updateStorageKey('inputs', this._inputKeys)
await this._updateStorageKey('outputs', this._outputKeys)
await this.announceAll()
}

async _updateStorageKey (file, input) {
const store = this._getStorage(file)
let i = 0
for (const data of input) {
const start = i * 32
// console.log('write data', data)
await new Promise((resolve, reject) => {
store.write(start, b4a.from(data, 'hex'), (err) => {
if (err) return reject(err)

resolve()
})
})
i++
}
store.close()
}
}
24 changes: 13 additions & 11 deletions tests/basics.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ test('full replicate', (t) => {
const streamB = storeB.replicate(false)

const managerA = new AutobaseManager(baseA, () => true,
storeA.get.bind(storeA))
storeA.get.bind(storeA), storeA.storage)
managerA.attachStream(streamA.noiseStream)
const managerB = new AutobaseManager(baseB, () => true,
storeB.get.bind(storeB))
storeB.get.bind(storeB), storeB.storage)
managerB.attachStream(streamB.noiseStream)

pipeline([
Expand All @@ -65,10 +65,10 @@ test('full replicate', (t) => {
const streamB = storeB.replicate(false)

const managerA = new AutobaseManager(baseA, () => true,
storeA.get.bind(storeA))
storeA.get.bind(storeA), storeA.storage)
managerA.attachStream(streamA.noiseStream)
const managerB = new AutobaseManager(baseB, () => true,
storeB.get.bind(storeB))
storeB.get.bind(storeB), storeB.storage)
managerB.attachStream(streamB.noiseStream)

pipeline([
Expand All @@ -95,10 +95,10 @@ test('full replicate', (t) => {
const streamB = storeB.replicate(false)

const managerA = new AutobaseManager(baseA, () => true,
storeA.get.bind(storeA))
storeA.get.bind(storeA), storeA.storage)
managerA.attachStream(streamA.noiseStream)
const managerB = new AutobaseManager(baseB, () => true,
storeB.get.bind(storeB))
storeB.get.bind(storeB), storeB.storage)
managerB.attachStream(streamB.noiseStream)

pipeline([
Expand All @@ -125,14 +125,14 @@ test('full replicate', (t) => {
const streamC = storeC.replicate(false)

const managerA = new AutobaseManager(baseA, () => true,
storeA.get.bind(storeA))
storeA.get.bind(storeA), storeA.storage)
managerA.attachStream(streamA.noiseStream)
managerA.attachStream(streamAforC.noiseStream)
const managerB = new AutobaseManager(baseB, () => true,
storeB.get.bind(storeB))
storeB.get.bind(storeB), storeB.storage)
managerB.attachStream(streamB.noiseStream)
const managerC = new AutobaseManager(baseC, () => true,
storeC.get.bind(storeB))
storeC.get.bind(storeB), storeC.storage)
managerC.attachStream(streamC.noiseStream)

pipeline([
Expand Down Expand Up @@ -175,9 +175,11 @@ test('full replicate', (t) => {
return denyList.indexOf(key) === -1
}

const managerA = new AutobaseManager(baseA, allow, storeA.get.bind(storeA))
const managerA = new AutobaseManager(baseA, allow, storeA.get.bind(storeA),
storeA.storage)
managerA.attachStream(streamA.noiseStream)
const managerB = new AutobaseManager(baseB, allow, storeB.get.bind(storeB))
const managerB = new AutobaseManager(baseB, allow, storeB.get.bind(storeB),
storeB.storage)
managerB.attachStream(streamB.noiseStream)

pipeline([
Expand Down

0 comments on commit 60b7c31

Please sign in to comment.