Skip to content

Commit

Permalink
Merge openebs#945
Browse files Browse the repository at this point in the history
945: fix(moac): volume gets stuck in offline state when doing failover r=jkryl a=jkryl

The problem was that the mayastor unregistered itself and node object
was removed from moac's internal state before all events related to
offlining the node could be processed. One of the previous complaints
about msn records was that they disappear whenever the storage node
is rebooted which is confusing for users. This fix is addressing both
issues at once. Instead of removing the node upon deregistration, it
disconnects the grpc client and switches the node to offline state.
So it stays around along with all other objects that might be on the
node (pools, replicas, nexuses) and events can be processed as
normally. The obvious downside is that users have to remove msn
records manually in case they know that the storage node will never
join the cluster again. But that's just a small inconvenience
compared to what this brings.

A note for insiders. The way how you can tell the difference between
offline node that is unreachable and offline node that unregistered
itself is by looking at grpcEndpoint in msn spec. It will be empty
string for the later.

Resolves: CAS-954

Co-authored-by: Jan Kryl <[email protected]>
  • Loading branch information
mayastor-bors and Jan Kryl committed Jun 15, 2021
2 parents ee49b47 + 4893638 commit 7f5d5a1
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 44 deletions.
2 changes: 1 addition & 1 deletion csi/moac/src/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ export class MessageBus {
return;
}
log.trace(`"${id}" requested deregistration`);
this.registry.removeNode(id);
this.registry.disconnectNode(id);
}

_subscribe () {
Expand Down
7 changes: 6 additions & 1 deletion csi/moac/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ export class Node extends events.EventEmitter {
log.info(
`mayastor endpoint on node "${this.name}" changed from "${this.endpoint}" to "${endpoint}"`
);
this.emit('node', {
eventType: 'mod',
object: this
});
this.client.close();
if (this.syncTimer) {
clearTimeout(this.syncTimer);
Expand All @@ -118,10 +122,11 @@ export class Node extends events.EventEmitter {

// Close the grpc connection
disconnect() {
if (!this.client) return;
log.info(`mayastor on node "${this.name}" is gone`);
assert(this.client);
this.client.close();
this.client = null;
this.endpoint = null;
if (this.syncTimer) {
clearTimeout(this.syncTimer);
this.syncTimer = null;
Expand Down
10 changes: 5 additions & 5 deletions csi/moac/src/node_operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ export class NodeResource extends CustomResource {
throw new Error('missing spec');
} else {
let grpcEndpoint = (cr.spec as any).grpcEndpoint;
if (grpcEndpoint === undefined) {
throw new Error('missing grpc endpoint in spec');
if (!grpcEndpoint) {
grpcEndpoint = '';
}
this.spec = { grpcEndpoint };
}
Expand Down Expand Up @@ -129,7 +129,7 @@ export class NodeOperator {
//
_bindWatcher (watcher: CustomResourceCache<NodeResource>) {
watcher.on('new', (obj: NodeResource) => {
if (obj.metadata) {
if (obj.metadata && obj.spec.grpcEndpoint) {
this.registry.addNode(obj.metadata.name, obj.spec.grpcEndpoint);
}
});
Expand All @@ -155,7 +155,7 @@ export class NodeOperator {
async _onNodeEvent (ev: any) {
const name = ev.object.name;
if (ev.eventType === 'new') {
const grpcEndpoint = ev.object.endpoint;
const grpcEndpoint = ev.object.endpoint || '';
let origObj = this.watcher.get(name);
if (origObj === undefined) {
await this._createResource(name, grpcEndpoint);
Expand All @@ -167,7 +167,7 @@ export class NodeOperator {
ev.object.isSynced() ? NodeState.Online : NodeState.Offline,
);
} else if (ev.eventType === 'mod') {
const grpcEndpoint = ev.object.endpoint;
const grpcEndpoint = ev.object.endpoint || '';
let origObj = this.watcher.get(name);
// The node might be just going away - do nothing if not in the cache
if (origObj !== undefined) {
Expand Down
31 changes: 19 additions & 12 deletions csi/moac/src/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,7 @@ export class Registry extends events.EventEmitter {
addNode (name: string, endpoint: string) {
let node = this.nodes[name];
if (node) {
// if grpc endpoint has not changed, then this will not do anything
if (node.endpoint !== endpoint) {
node.connect(endpoint);
this.emit('node', {
eventType: 'mod',
object: node
});
}
node.connect(endpoint);
} else {
node = new this.Node(name, this.nodeOpts);
node.connect(endpoint);
Expand Down Expand Up @@ -101,22 +94,36 @@ export class Registry extends events.EventEmitter {
});
}

// Disconnect the node and offline it (but keep it in the list).
//
// @param name Name of the node to offline.
disconnectNode (name: string) {
const node = this.nodes[name];
if (!node) return;
log.info(`mayastor on node "${name}" left`);
node.disconnect();
}

// Remove mayastor node from the list of nodes and unsubscribe events.
//
// @param {string} name Name of the node to remove.
// @param name Name of the node to remove.
removeNode (name: string) {
const node = this.nodes[name];
if (!node) return;
delete this.nodes[name];
node.disconnect();
node.unbind();

log.info(`mayastor on node "${name}" left`);
// There is a hidden problem here. Some actions that should have been
// done on behalf of node.disconnect() above, might not have sufficient time
// to run and after we would remove the node from list of the nodes and
// unsubscribe event subscribers, further event propagation on this node
// would stop. As a workaround we never remove the node unless we are
// shutting down the moac. Users can remove a node by kubectl if they wish.
node.unbind();
this.emit('node', {
eventType: 'del',
object: node
});

eventObjects.forEach((objType) => {
node.removeAllListeners(objType);
});
Expand Down
6 changes: 4 additions & 2 deletions csi/moac/test/nats_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ module.exports = function () {
nc = null;
}
stopNats();
registry.close();
});

it('should connect to the nats server', async () => {
Expand Down Expand Up @@ -148,9 +149,10 @@ module.exports = function () {
id: 'v0/deregister',
data: { id: NODE_NAME }
})));
expect(registry.getNode(NODE_NAME).isSynced());
await waitUntil(async () => {
return !registry.getNode(NODE_NAME);
}, 1000, 'node removal');
return !registry.getNode(NODE_NAME).isSynced();
}, 1000, 'node offline');
});

it('should disconnect from the nats server', () => {
Expand Down
44 changes: 42 additions & 2 deletions csi/moac/test/node_operator_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ module.exports = function () {
expect(obj.status).to.equal('unknown');
});

it('should not create node resource without grpc endpoint', () => {
expect(() => createNodeResource(NAME)).to.throw();
// empty endpoint means that the node has unregistered itself
it('should create node resource with empty grpc endpoint', () => {
const obj = createNodeResource(NAME, '', 'offline');
expect(obj.metadata.name).to.equal(NAME);
expect(obj.spec.grpcEndpoint).to.equal('');
expect(obj.status).to.equal('offline');
});
});

Expand Down Expand Up @@ -175,6 +179,14 @@ module.exports = function () {
sinon.assert.calledWith(addNodeSpy, NAME, ENDPOINT);
});

it('should not add node to registry if endpoint is empty', async () => {
const addNodeSpy = sinon.spy(registry, 'addNode');
nodeResource.spec.grpcEndpoint = '';
oper.watcher.emit('new', nodeResource);
await sleep(EVENT_PROPAGATION_DELAY);
sinon.assert.notCalled(addNodeSpy);
});

it('should remove node from registry upon "del" event', async () => {
// create registry with a node
const node = new Node(NAME);
Expand Down Expand Up @@ -354,6 +366,34 @@ module.exports = function () {
sinon.assert.notCalled(stubs.delete);
});

it('should update spec and status of the resource upon "mod" node event', async () => {
let stubs;
const nodeResource = createNodeResource(NAME, ENDPOINT, 'online');
mockCache(oper.watcher, (arg) => {
stubs = arg;
stubs.get.returns(nodeResource);
});
await oper.start();
// give time to registry to install its callbacks
await sleep(EVENT_PROPAGATION_DELAY);
registry.addNode(NAME, ENDPOINT);
await sleep(EVENT_PROPAGATION_DELAY);
const node = registry.getNode(NAME);
const isSyncedStub = sinon.stub(node, 'isSynced');
isSyncedStub.returns(false);
node.disconnect();
await sleep(EVENT_PROPAGATION_DELAY);

sinon.assert.notCalled(stubs.create);
sinon.assert.calledOnce(stubs.update);
expect(stubs.update.args[0][5].metadata.name).to.equal(NAME);
expect(stubs.update.args[0][5].spec.grpcEndpoint).to.equal('');
sinon.assert.calledOnce(stubs.updateStatus);
expect(stubs.updateStatus.args[0][5].metadata.name).to.equal(NAME);
expect(stubs.updateStatus.args[0][5].status).to.equal('offline');
sinon.assert.notCalled(stubs.delete);
});

it('should not crash if PUT fails upon "mod" node event', async () => {
let stubs;
const nodeResource = createNodeResource(NAME, ENDPOINT, 'online');
Expand Down
11 changes: 11 additions & 0 deletions csi/moac/test/node_stub.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,23 @@ class NodeStub extends Node {

connect (endpoint) {
this.syncFailed = 0;
if (this.endpoint === endpoint) {
// nothing changed
return;
} else if (this.endpoint) {
this.emit('node', {
eventType: 'mod',
object: this
});
}
this.endpoint = endpoint;
}

disconnect () {
this.syncFailed = this.syncBadLimit + 1;
this.endpoint = null;
this.client = null;
this._offline();
}
}

Expand Down
37 changes: 16 additions & 21 deletions csi/moac/test/registry_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ module.exports = function () {
// ensure the events from the node are relayed by the registry
const events = ['node', 'pool', 'replica', 'nexus'];
events.forEach((ev) => {
registry.once(ev, () => {
registry.on(ev, () => {
const idx = events.findIndex((ent) => ent === ev);
expect(idx).to.not.equal(-1);
events.splice(idx, 1);
Expand All @@ -47,39 +47,34 @@ module.exports = function () {
it('should not do anything if the same node already exists in the registry', () => {
const registry = new Registry({});
registry.Node = Node;
const node = new Node('node');
node.connect('127.0.0.1:123');
const connectStub = sinon.stub(node, 'connect');
registry.nodes.node = node;

let nodeEvent;
registry.once('node', (ev) => {
nodeEvent = ev;
const nodeEvents = [];
registry.on('node', (ev) => {
nodeEvents.push(ev);
});

registry.addNode('node', '127.0.0.1:123');
sinon.assert.notCalled(connectStub);
expect(nodeEvent).to.be.undefined;
expect(nodeEvents).to.have.lengthOf(1);
expect(nodeEvents[0].eventType).to.equal('new');

registry.addNode('node', '127.0.0.1:123');
expect(nodeEvents).to.have.lengthOf(1);
});

it('should reconnect node if it exists but grpc endpoint has changed', () => {
const registry = new Registry({});
registry.Node = Node;
const node = new Node('node');
node.connect('127.0.0.1:123');
const connectStub = sinon.stub(node, 'connect');
registry.nodes.node = node;

let nodeEvent;
registry.once('node', (ev) => {
nodeEvent = ev;
const nodeEvents = [];
registry.on('node', (ev) => {
nodeEvents.push(ev);
});

registry.addNode('node', '127.0.0.1:123');
registry.addNode('node', '127.0.0.1:124');
sinon.assert.calledOnce(connectStub);
sinon.assert.calledWith(connectStub, '127.0.0.1:124');
expect(nodeEvent.eventType).to.equal('mod');
expect(nodeEvent.object.name).to.equal('node');
expect(nodeEvents).to.have.lengthOf(2);
expect(nodeEvents[0].eventType).to.equal('new');
expect(nodeEvents[1].eventType).to.equal('mod');
});

it('should get a list of nodes from registry', () => {
Expand Down

0 comments on commit 7f5d5a1

Please sign in to comment.