diff --git a/pkg/kafka/producer/producer.go b/pkg/kafka/producer/producer.go index c758ad0..03a064c 100644 --- a/pkg/kafka/producer/producer.go +++ b/pkg/kafka/producer/producer.go @@ -58,29 +58,42 @@ func (p *Producer) Produce(msg kafka.Message) (int32, int64, error) { func (p *Producer) Close() error { return p.producer.Close() } - func (p *Producer) ProduceTx(msg []kafka.Message) (int32, int64, error) { - time.Sleep(time.Millisecond * 200) + p.logger.Infof("Starting transaction for %d messages", len(msg)) err := p.producer.BeginTxn() if err != nil { + p.logger.Errorf("Failed to begin transaction: %s", err) return 0, 0, err } - var par int32 - var off int64 - for _, m := range msg { - par, off, err = p.Produce(m) + var partition int32 + var offset int64 + for i, m := range msg { + partition, offset, err = p.Produce(m) if err != nil { - err = p.producer.AbortTxn() - if err != nil { - return 0, 0, err + p.logger.Errorf("Failed to produce message %d: %s", i, err) + abortErr := p.producer.AbortTxn() + if abortErr != nil { + return 0, 0, fmt.Errorf("produce error: %w, abort txn error: %s", err, abortErr) } + return 0, 0, err } } - err = p.producer.CommitTxn() - if err != nil { - return 0, 0, err + retryCount := 3 + for i := 0; i < retryCount; i++ { + err = p.producer.CommitTxn() + if err == nil { + p.logger.Infof("Transaction committed successfully") + return partition, offset, nil + } + time.Sleep(100 * time.Millisecond) + } + + p.logger.Errorf("Failed to commit transaction after %d retries: %s", retryCount, err) + abortErr := p.producer.AbortTxn() + if abortErr != nil { + return 0, 0, fmt.Errorf("commit error: %w, abort txn error: %s", err, abortErr) } - return par, off, nil + return 0, 0, fmt.Errorf("commit error: %w", err) }