forked from GraphQLCollege/graphql-postgres-subscriptions
-
Notifications
You must be signed in to change notification settings - Fork 5
/
postgres-pubsub.js
138 lines (125 loc) · 4.33 KB
/
postgres-pubsub.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
const { PubSub } = require("graphql-subscriptions");
const pgListen = require("pg-listen");
const {
eventEmitterAsyncIterator
} = require("./event-emitter-to-async-iterator");
const defaultCommonMessageHandler = message => message;
class PostgresPubSub extends PubSub {
constructor(options = {}) {
const { commonMessageHandler, ...pgOptions } = options;
super();
const pgListenOptions = {
native: options.native,
paranoidChecking: options.paranoidChecking,
retryInterval: options.retryInterval,
retryLimit: options.retryLimit,
retryTimeout: options.retryTimeout,
parse: options.parse,
serialize: options.serialize,
}
this.pgListen = pgListen(pgOptions, pgListenOptions);
this.triggers = (pgOptions.topics || []).concat(['error']);
this.ee = this.pgListen.notifications;
this.events = this.pgListen.events;
this.subscriptions = {};
this.subIdCounter = 0;
this.commonMessageHandler = commonMessageHandler || defaultCommonMessageHandler;
this.connected = false;
}
/**
* @returns
* Rejects when any of the following occur:
* 1. pg-listen's initial `connect` fails for an exotic (i.e., non-ECONNREFUSED)
* reason.
* 2. pg-listen emits 'error', likely indicating initial connection failed
* even after repeated attempts.
* 3. Connection to the database was successful, but at least one
* `LISTEN` query failed.
*
* Fulfills otherwise, indicating all of the requested triggers are now being
* listened to.
*/
async connect() {
// These event listeners must be added prior to calling pg-listen's
// `connect`, who may emit these events.
const connectedAndListening = new Promise((resolve, reject) => {
this.pgListen.events.once('connected', () => {
this.initTopics(this.triggers).then(resolve, reject);
});
});
const errorThrown = new Promise((_, reject) => {
this.pgListen.events.once('error', reject);
})
try {
await this.pgListen.connect();
} catch (e) {
if (!e.message.includes('ECONNREFUSED')) throw e;
}
await Promise.race([connectedAndListening, errorThrown]);
this.connected = true;
}
initTopics(triggers) {
// confusingly, `pgListen.connect()` will reject if the first connection attempt fails
// but then it will retry and emit a `connected` event if it later connects
// see https://github.com/andywer/pg-listen/issues/32
// so we put logic on the `connected` event
return Promise.all(triggers.map((eventName) => {
return this.pgListen.listenTo(eventName);
}));
}
async publish(triggerName, payload) {
if (!this.connected) {
const message = `attempted to publish a ${triggerName} event via pubsub, but client is not yet connected`;
return Promise.reject(new Error(message));
}
await this.pgListen.notify(triggerName, payload);
return true;
}
async subscribe(triggerName, onMessage) {
const callback = message => {
onMessage(
message instanceof Error
? message
: this.commonMessageHandler(message)
);
};
await this.pgListen.listenTo(triggerName);
this.pgListen.notifications.on(triggerName, callback);
this.subIdCounter = this.subIdCounter + 1;
this.subscriptions[this.subIdCounter] = [triggerName, callback];
return Promise.resolve(this.subIdCounter);
}
async unsubscribe(subId) {
if (!this.connected) {
console.log('attempted to unsubscribe to events via pubsub, but client is not yet connected')
}
const [triggerName, onMessage] = this.subscriptions[subId];
delete this.subscriptions[subId];
this.pgListen.unlisten(triggerName);
}
async close() {
await this.pgListen.unlistenAll();
await this.pgListen.close();
this.connected = false;
}
/*
* The difference between this function and asyncIterator is that the
* topics can still be empty.
*/
async asyncIteratorPromised(triggers) {
await this.initTopics(Array.isArray(triggers) ? triggers : [triggers] );
return eventEmitterAsyncIterator(
this.pgListen,
triggers,
this.commonMessageHandler
);
}
asyncIterator(triggers) {
return eventEmitterAsyncIterator(
this.pgListen,
triggers,
this.commonMessageHandler
);
}
}
module.exports = { PostgresPubSub };