Skip to content

Commit

Permalink
more extensiver logging
Browse files Browse the repository at this point in the history
Co-authored-by: sindrerh2 <[email protected]>
Co-authored-by: ybelMekk <[email protected]>
  • Loading branch information
3 people committed Nov 26, 2024
1 parent 5615052 commit 440a90d
Showing 1 changed file with 26 additions and 13 deletions.
39 changes: 26 additions & 13 deletions pkg/kafka/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,42 @@ func (p *Producer) Produce(msg kafka.Message) (int32, int64, error) {
func (p *Producer) Close() error {
return p.producer.Close()
}

func (p *Producer) ProduceTx(msg []kafka.Message) (int32, int64, error) {
time.Sleep(time.Millisecond * 200)
p.logger.Infof("Starting transaction for %d messages", len(msg))

Check failure on line 62 in pkg/kafka/producer/producer.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

p.logger undefined (type *Producer has no field or method logger)
err := p.producer.BeginTxn()
if err != nil {
p.logger.Errorf("Failed to begin transaction: %s", err)

Check failure on line 65 in pkg/kafka/producer/producer.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

p.logger undefined (type *Producer has no field or method logger)
return 0, 0, err
}

var par int32
var off int64
for _, m := range msg {
par, off, err = p.Produce(m)
var partition int32
var offset int64
for i, m := range msg {
partition, offset, err = p.Produce(m)
if err != nil {
err = p.producer.AbortTxn()
if err != nil {
return 0, 0, err
p.logger.Errorf("Failed to produce message %d: %s", i, err)

Check failure on line 74 in pkg/kafka/producer/producer.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

p.logger undefined (type *Producer has no field or method logger)
abortErr := p.producer.AbortTxn()
if abortErr != nil {
return 0, 0, fmt.Errorf("produce error: %w, abort txn error: %s", err, abortErr)

Check failure on line 77 in pkg/kafka/producer/producer.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

undefined: fmt
}
return 0, 0, err
}
}

err = p.producer.CommitTxn()
if err != nil {
return 0, 0, err
retryCount := 3
for i := 0; i < retryCount; i++ {
err = p.producer.CommitTxn()
if err == nil {
p.logger.Infof("Transaction committed successfully")

Check failure on line 87 in pkg/kafka/producer/producer.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

p.logger undefined (type *Producer has no field or method logger)
return partition, offset, nil
}
time.Sleep(100 * time.Millisecond)
}

p.logger.Errorf("Failed to commit transaction after %d retries: %s", retryCount, err)

Check failure on line 93 in pkg/kafka/producer/producer.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

p.logger undefined (type *Producer has no field or method logger)
abortErr := p.producer.AbortTxn()
if abortErr != nil {
return 0, 0, fmt.Errorf("commit error: %w, abort txn error: %s", err, abortErr)

Check failure on line 96 in pkg/kafka/producer/producer.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

undefined: fmt
}
return par, off, nil
return 0, 0, fmt.Errorf("commit error: %w", err)

Check failure on line 98 in pkg/kafka/producer/producer.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

undefined: fmt
}

0 comments on commit 440a90d

Please sign in to comment.