-
Notifications
You must be signed in to change notification settings - Fork 20
/
producer.go
131 lines (109 loc) · 3.83 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package rabbitmq
import "github.com/streadway/amqp"
type Producer struct {
// Base struct for Producer
*RabbitMQ
// The communication channel over connection
channel *amqp.Channel
// A notifiyng channel for publishings
done chan error
// Current producer connection settings
session Session
}
type PublishingOptions struct {
// The key that when publishing a message to a exchange/queue will be only delivered to
// given routing key listeners
RoutingKey string
// Publishing tag
Tag string
// Queue should be on the server/broker
Mandatory bool
// Consumer should be bound to server
Immediate bool
}
// NewProducer is a constructor function for producer creation Accepts Exchange,
// Queue, PublishingOptions. On the other hand we are not declaring our topology
// on both the publisher and consumer to be able to change the settings only in
// one place. We can declare those settings on both place to ensure they are
// same. But this package will not support it.
func (r *RabbitMQ) NewProducer(e Exchange, q Queue, po PublishingOptions) (*Producer, error) {
rmq, err := r.Connect()
if err != nil {
return nil, err
}
// getting a channel
channel, err := r.conn.Channel()
if err != nil {
return nil, err
}
return &Producer{
RabbitMQ: rmq,
channel: channel,
session: Session{
Exchange: e,
Queue: q,
PublishingOptions: po,
},
}, nil
}
// Publish sends a Publishing from the client to an exchange on the server.
func (p *Producer) Publish(publishing amqp.Publishing) error {
e := p.session.Exchange
q := p.session.Queue
po := p.session.PublishingOptions
routingKey := po.RoutingKey
// if exchange name is empty, this means we are gonna publish
// this mesage to a queue, every queue has a binding to default exchange
if e.Name == "" {
routingKey = q.Name
}
err := p.channel.Publish(
e.Name, // publish to an exchange(it can be default exchange)
routingKey, // routing to 0 or more queues
po.Mandatory, // mandatory, if no queue than err
po.Immediate, // immediate, if no consumer than err
publishing,
// amqp.Publishing {
// // Application or exchange specific fields,
// // the headers exchange will inspect this field.
// Headers Table
// // Properties
// ContentType string // MIME content type
// ContentEncoding string // MIME content encoding
// DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
// Priority uint8 // 0 to 9
// CorrelationId string // correlation identifier
// ReplyTo string // address to to reply to (ex: RPC)
// Expiration string // message expiration spec
// MessageId string // message identifier
// Timestamp time.Time // message timestamp
// Type string // message type name
// UserId string // creating user id - ex: "guest"
// AppId string // creating application id
// // The application specific payload of the message
// Body []byte
// }
)
return err
}
// NotifyReturn captures a message when a Publishing is unable to be
// delivered either due to the `mandatory` flag set
// and no route found, or `immediate` flag set and no free consumer.
func (p *Producer) NotifyReturn(notifier func(message amqp.Return)) {
go func() {
for res := range p.channel.NotifyReturn(make(chan amqp.Return)) {
notifier(res)
}
}()
}
// Shutdown gracefully closes all connections
func (p *Producer) Shutdown() error {
co := p.session.ConsumerOptions
if err := shutdownChannel(p.channel, co.Tag); err != nil {
return err
}
// Since publishing is asynchronous this can happen
// instantly without waiting for a done message.
defer p.RabbitMQ.log.Info("Producer shutdown OK")
return nil
}