From 52fed45e4cb7d3a62225d8a5d2efb223aba53877 Mon Sep 17 00:00:00 2001 From: wushengyu Date: Tue, 27 Aug 2024 10:33:55 +0800 Subject: [PATCH] rm msgs log, too many --- consumer/push_consumer.go | 51 ++++++++++++++++++--------------------- rlog/log.go | 1 - 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 259980a6..9faf5685 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -821,11 +821,11 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { SuspendTimeoutMillis: 20 * time.Second, } // - //if data.ExpType == string(TAG) { + // if data.ExpType == string(TAG) { // pullRequest.SubVersion = 0 - //} else { + // } else { // pullRequest.SubVersion = data.SubVersion - //} + // } brokerResult := pc.defaultConsumer.tryFindBroker(request.mq) if brokerResult == nil { @@ -954,31 +954,31 @@ func (pc *pushConsumer) resume() { } func (pc *pushConsumer) ResetOffset(topic string, table map[primitive.MessageQueue]int64) { - //topic := cmd.ExtFields["topic"] - //group := cmd.ExtFields["group"] - //if topic == "" || group == "" { + // topic := cmd.ExtFields["topic"] + // group := cmd.ExtFields["group"] + // if topic == "" || group == "" { // rlog.Warning("received reset offset command from: %s, but missing params.", from) // return - //} - //t, err := strconv.ParseInt(cmd.ExtFields["timestamp"], 10, 64) - //if err != nil { + // } + // t, err := strconv.ParseInt(cmd.ExtFields["timestamp"], 10, 64) + // if err != nil { // rlog.Warning("received reset offset command from: %s, but parse time error: %s", err.Error()) // return - //} - //rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s, topic=%s, group=%s, timestamp=%v", + // } + // rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s, topic=%s, group=%s, timestamp=%v", // from, topic, group, t) // - //offsetTable := make(map[MessageQueue]int64, 0) - //err = json.Unmarshal(cmd.Body, &offsetTable) - //if err != nil { + // offsetTable := make(map[MessageQueue]int64, 0) + // err = json.Unmarshal(cmd.Body, &offsetTable) + // if err != nil { // rlog.Warning("received reset offset command from: %s, but parse offset table: %s", err.Error()) // return - //} - //v, exist := c.consumerMap.Load(group) - //if !exist { + // } + // v, exist := c.consumerMap.Load(group) + // if !exist { // rlog.Infof("[reset-offset] consumer dose not exist. group=%s", group) // return - //} + // } pc.suspend() defer pc.resume() @@ -1149,14 +1149,12 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti if err != nil { rlog.Warning("consumeMessageCurrently error", map[string]interface{}{ rlog.LogKeyUnderlayError: err, - rlog.LogKeyMessages: msgs, rlog.LogKeyMessageQueue: mq, rlog.LogKeyConsumerGroup: pc.consumerGroup, }) msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn) } else if consumeRT >= pc.option.ConsumeTimeout { rlog.Warning("consumeMessageCurrently time out", map[string]interface{}{ - rlog.LogKeyMessages: msgs, rlog.LogKeyMessageQueue: mq, rlog.LogKeyConsumerGroup: pc.consumerGroup, }) @@ -1291,7 +1289,6 @@ func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.Me if err != nil { rlog.Warning("consumeMessage orderly error", map[string]interface{}{ rlog.LogKeyUnderlayError: err, - rlog.LogKeyMessages: msgs, rlog.LogKeyMessageQueue: mq.String(), rlog.LogKeyConsumerGroup: pc.consumerGroup, }) @@ -1307,15 +1304,15 @@ func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.Me } // just put consumeResult in consumerMessageCtx - //interval = time.Now().Sub(beginTime) - //consumeReult := SuccessReturn - //if interval > pc.option.ConsumeTimeout { + // interval = time.Now().Sub(beginTime) + // consumeReult := SuccessReturn + // if interval > pc.option.ConsumeTimeout { // consumeReult = TimeoutReturn - //} else if SuspendCurrentQueueAMoment == result { + // } else if SuspendCurrentQueueAMoment == result { // consumeReult = FailedReturn - //} else if ConsumeSuccess == result { + // } else if ConsumeSuccess == result { // consumeReult = SuccessReturn - //} + // } // process result commitOffset := int64(-1) diff --git a/rlog/log.go b/rlog/log.go index 5c99e2d9..d55b3343 100644 --- a/rlog/log.go +++ b/rlog/log.go @@ -43,7 +43,6 @@ const ( LogKeyStoreHost = "storeHost" LogKeyQueueId = "queueId" LogKeyQueueOffset = "queueOffset" - LogKeyMessages = "messages" ) type Logger interface {