From 15d2588c2de0021236435774aeb7c887375047e4 Mon Sep 17 00:00:00 2001 From: absolute8511 Date: Fri, 31 May 2024 16:43:40 +0800 Subject: [PATCH] fix: add lock for consumer crCh map Change-Id: I62867332751efe7468c9e6d332b308e79defd60b --- consumer/push_consumer.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 259980a6..dd2d7233 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -73,6 +73,7 @@ type pushConsumer struct { done chan struct{} closeOnce sync.Once crCh map[string]chan struct{} + crChLock sync.Mutex } func NewPushConsumer(opts ...Option) (*pushConsumer, error) { @@ -297,9 +298,12 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector, if pc.option.Namespace != "" { topic = pc.option.Namespace + "%" + topic } + pc.crChLock.Lock() if _, ok := pc.crCh[topic]; !ok { pc.crCh[topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums) } + pc.crChLock.Unlock() + data := buildSubscriptionData(topic, selector) pc.subscriptionDataTable.Store(topic, data) pc.subscribedTopic[topic] = "" @@ -1086,9 +1090,12 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti limiter := pc.option.Limiter limiterOn := limiter != nil + pc.crChLock.Lock() if _, ok := pc.crCh[mq.Topic]; !ok { pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums) } + crCh := pc.crCh[mq.Topic] + pc.crChLock.Unlock() for count := 0; count < len(msgs); count++ { var subMsgs []*primitive.MessageExt @@ -1104,7 +1111,7 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti if limiterOn { limiter(utils.WithoutNamespace(mq.Topic)) } - pc.crCh[mq.Topic] <- struct{}{} + crCh <- struct{}{} go primitive.WithRecover(func() { defer func() { @@ -1114,7 +1121,7 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti rlog.LogKeyConsumerGroup: pc.consumerGroup, }) } - <-pc.crCh[mq.Topic] + <-crCh }() RETRY: if pq.IsDroppd() {