diff --git a/patch-channel-store-methods.js b/patch-channel-store-methods.js index 85af947..d37fa6f 100644 --- a/patch-channel-store-methods.js +++ b/patch-channel-store-methods.js @@ -1,5 +1,7 @@ console.log('PATCH-CHANNEL-STORE-METHODS'); +const ReflectApply = Reflect.apply; + module.exports = function (dc) { const channels = new WeakSet(); @@ -10,18 +12,63 @@ module.exports = function (dc) { if (channels.has(ch)) return ch; - ch.bindStore = function() { - // TODO + ch._stores = new Map(); + + ch.bindStore = function(store, transform) { + // const replacing = this._stores.has(store); + // if (!replacing) channels.incRef(this.name); + this._stores.set(store, transform); }; - ch.unbindStore = function() { - // TODO + ch.unbindStore = function(store) { + if (!this._stores.has(store)) { + return false; + } + + this._stores.delete(store); + + // channels.decRef(this.name); + // maybeMarkInactive(this); + + return true; }; - ch.runStores = function() { - // TODO + ch.runStores = function(data, fn, thisArg, ...args) { + let run = () => { + this.publish(data); + return ReflectApply(fn, thisArg, args); + }; + + for (const entry of this._stores.entries()) { + const store = entry[0]; + const transform = entry[1]; + run = wrapStoreRun(store, data, run, transform); + } + + return run(); }; return ch; }; }; + +function wrapStoreRun(store, data, next, transform = defaultTransform) { + return () => { + let context; + try { + context = transform(data); + } catch (err) { + process.nextTick(() => { + // triggerUncaughtException(err, false); + throw err; + }); + return next(); + } + + return store.run(context, next); + }; +} + +function defaultTransform(data) { + return data; +} diff --git a/test/common.js b/test/common.js index b0e546f..915c135 100644 --- a/test/common.js +++ b/test/common.js @@ -1,6 +1,7 @@ const assert = require('assert'); const { inspect } = require('util'); +const noop = () => {}; function getCallSite(top) { const originalStackFormatter = Error.prepareStackTrace; diff --git a/test/test-diagnostics-channel-bind-store.spec.js b/test/test-diagnostics-channel-bind-store.spec.js new file mode 100644 index 0000000..eb90489 --- /dev/null +++ b/test/test-diagnostics-channel-bind-store.spec.js @@ -0,0 +1,115 @@ +'use strict'; + +const test = require('tape'); +const common = require('./common.js'); +const dc = require('../dc-polyfill.js'); +const { AsyncLocalStorage } = require('async_hooks'); + +test('test-diagnostics-channel-bind-store.spec', t => { + let n = 0; + const thisArg = new Date(); + const inputs = [ + { foo: 'bar' }, + { baz: 'buz' }, + ]; + + const channel = dc.channel('test-diagnostics-channel-bind-store.spec'); + + // Bind a storage directly to published data + const store1 = new AsyncLocalStorage(); + channel.bindStore(store1); + let store1bound = true; + + // Bind a store with transformation of published data + const store2 = new AsyncLocalStorage(); + channel.bindStore(store2, common.mustCall((data) => { + t.strictEqual(data, inputs[n]); + return { data }; + }, 4)); + + // Regular subscribers should see publishes from runStores calls + channel.subscribe(common.mustCall((data) => { + if (store1bound) { + t.deepEqual(data, store1.getStore()); + } + t.deepEqual({ data }, store2.getStore()); + t.strictEqual(data, inputs[n]); + }, 4)); + + // Verify stores are empty before run + t.strictEqual(store1.getStore(), undefined); + t.strictEqual(store2.getStore(), undefined); + + channel.runStores(inputs[n], common.mustCall(function(a, b) { + // Verify this and argument forwarding + t.strictEqual(this, thisArg); + t.strictEqual(a, 1); + t.strictEqual(b, 2); + + // Verify store 1 state matches input + t.strictEqual(store1.getStore(), inputs[n]); + + // Verify store 2 state has expected transformation + t.deepEqual(store2.getStore(), { data: inputs[n] }); + + // Should support nested contexts + n++; + channel.runStores(inputs[n], common.mustCall(function() { + // Verify this and argument forwarding + // TODO: skipping this test as `this` is equal to `global` + // might be a bug with mustCall? or a bug with wrapping Node's test in a tape function? + // t.strictEqual(this, undefined); + + // Verify store 1 state matches input + t.strictEqual(store1.getStore(), inputs[n]); + + // Verify store 2 state has expected transformation + t.deepEqual(store2.getStore(), { data: inputs[n] }); + })); + n--; + + // Verify store 1 state matches input + t.strictEqual(store1.getStore(), inputs[n]); + + // Verify store 2 state has expected transformation + t.deepEqual(store2.getStore(), { data: inputs[n] }); + }), thisArg, 1, 2); + + // Verify stores are empty after run + t.strictEqual(store1.getStore(), undefined); + t.strictEqual(store2.getStore(), undefined); + + // Verify unbinding works + t.ok(channel.unbindStore(store1)); + store1bound = false; + + // Verify unbinding a store that is not bound returns false + t.ok(!channel.unbindStore(store1)); + + n++; + channel.runStores(inputs[n], common.mustCall(() => { + // Verify after unbinding store 1 will remain undefined + t.strictEqual(store1.getStore(), undefined); + + // Verify still bound store 2 receives expected data + t.deepEqual(store2.getStore(), { data: inputs[n] }); + })); + + // Contain transformer errors and emit on next tick + const fail = new Error('fail'); + channel.bindStore(store1, () => { + throw fail; + }); + + let calledRunStores = false; + process.once('uncaughtException', common.mustCall((err) => { + t.strictEqual(calledRunStores, true); + t.strictEqual(err, fail); + setImmediate(() => { + t.end(); + }); + })); + + channel.runStores(inputs[n], common.mustCall()); + calledRunStores = true; +});