-
Notifications
You must be signed in to change notification settings - Fork 0
/
cleanup_visibility.go
134 lines (117 loc) · 3.24 KB
/
cleanup_visibility.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package sqsconsumer
import (
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"golang.org/x/net/context"
)
type visibilityExtenderQueue struct {
queue chan *sqs.Message
sync.Mutex
entries []*sqs.Message
svc *SQSService
extensionSecs int64
ticker <-chan time.Time
}
// NewBatchBatchVisibilityExtender starts a batch visibility extender routine that extends visibilty on messages until they are sent to the returned channel
func NewBatchVisibilityExtender(ctx context.Context, s *SQSService, ticker <-chan time.Time, extensionSecs int64, pending []*sqs.Message) chan<- *sqs.Message {
entries := make([]*sqs.Message, len(pending))
copy(entries, pending)
veq := &visibilityExtenderQueue{
svc: s,
ticker: ticker,
extensionSecs: extensionSecs,
queue: make(chan *sqs.Message, len(pending)),
entries: entries,
}
go veq.start(ctx)
return veq.queue
}
func (veq *visibilityExtenderQueue) removeFromPending(msg *sqs.Message) {
veq.Lock()
defer veq.Unlock()
for i, e := range veq.entries {
if *msg.MessageId == *e.MessageId {
veq.entries = append(veq.entries[:i], veq.entries[i+1:]...)
return
}
}
}
// extendBatch extends the visibility of a batch of messages and returns the list of messages that failed to extend and an error for overall failure.
func (veq *visibilityExtenderQueue) extendBatch(msgs []*sqs.Message) ([]*sqs.Message, error) {
if len(msgs) == 0 {
return nil, nil
}
var entries []*sqs.ChangeMessageVisibilityBatchRequestEntry
for _, m := range msgs {
veq.svc.Logger("Extending visibility timeout for message %s", aws.StringValue(m.MessageId))
entries = append(entries, &sqs.ChangeMessageVisibilityBatchRequestEntry{
Id: m.MessageId,
ReceiptHandle: m.ReceiptHandle,
VisibilityTimeout: aws.Int64(veq.extensionSecs),
})
}
p := &sqs.ChangeMessageVisibilityBatchInput{
QueueUrl: veq.svc.URL,
Entries: entries,
}
resp, err := veq.svc.Svc.ChangeMessageVisibilityBatch(p)
if err != nil {
return msgs, err
}
var failed []*sqs.Message
for _, f := range resp.Failed {
for _, m := range msgs {
if *m.MessageId == *f.Id {
failed = append(failed, m)
break
}
}
}
return failed, nil
}
// extendPending extends all the pending messages
func (veq *visibilityExtenderQueue) extendPending() {
veq.Lock()
defer veq.Unlock()
fails, err := veq.extendBatch(veq.entries)
if err != nil {
veq.svc.Logger("Error extending batch: %s", err)
}
var retries uint
for len(fails) > 0 && retries <= 5 {
retries++
time.Sleep(time.Duration((2<<retries)*50) * time.Millisecond)
fails, err = veq.extendBatch(fails)
if err != nil {
veq.svc.Logger("Error extending batch: %s", err)
}
}
}
func (veq *visibilityExtenderQueue) start(ctx context.Context) {
// read from the queue to remove pending items, and extending visibility for all still spending messages periodically
for {
select {
case <-ctx.Done():
// drain the queue and return
for {
select {
case <-veq.queue:
default:
return
}
}
case msg := <-veq.queue:
veq.removeFromPending(msg)
case <-veq.ticker:
veq.extendPending()
}
veq.Lock()
n := len(veq.entries)
veq.Unlock()
if n == 0 {
return
}
}
}