-
Notifications
You must be signed in to change notification settings - Fork 139
/
rabbitmq-service.js
55 lines (46 loc) · 1.55 KB
/
rabbitmq-service.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
const amqlib = require('amqplib')
class RabbitMQ {
constructor() {
this.url = `amqp://${process.env.RABBITMQ_LOGIN}:${process.env.RABBITMQ_PASSWORD}@${process.env.RABBITMQ_HOST}:${process.env.RABBITMQ_PORT}/${process.env.RABBITMQ_VHOST}`
this.connection = null
this.channel = null
}
async connect() {
if (!this.connection) this.connection = await amqlib.connect(this.url)
if (!this.channel) this.channel = await this.connection.createChannel()
this.channel.prefetch(1)
}
async send(queue, message) {
try {
if (this.channel) {
this.channel.assertQueue(queue, { durable: true, queueMode: 'lazy' })
this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(message), 'utf-8'), { persistent: true })
}
}
catch (error) {
console.log(error.message)
}
}
async consume(queue, callback) {
try {
if (this.channel) {
this.channel.assertQueue(queue, { durable: true, queueMode: 'lazy' })
this.channel.consume(queue, callback, { noAck: true })
}
}
catch (error) {
console.log(error.message)
}
}
}
class RabbitMQService {
static async getInstance() {
if (!RabbitMQService.instance) {
let instance = new RabbitMQ()
await instance.connect()
RabbitMQService.instance = instance
}
return RabbitMQService.instance
}
}
module.exports = RabbitMQService