Skip to content

Commit

Permalink
Add EnableDeliveryReports to KafkaAttribute
Browse files Browse the repository at this point in the history
  • Loading branch information
gliljas committed Nov 6, 2022
1 parent 5ae3079 commit 58f134e
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ The settings exposed here are targeted to more advanced users that want to custo
|LibkafkaDebug|debug|Both
|MetadataMaxAgeMs|metadata.max.age.ms|Both
|SocketKeepaliveEnable|socket.keepalive.enable|Both
|EnableDeliveryReports|Feature of Confluent.Kafka|Output

**NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187).
**NOTE:** `AutoOffsetReset` default is Earliest. Allowed Values are `Earliest` and `Latest`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,12 @@ public KafkaAttribute()
/// ssl.key.password in librdkafka
/// </summary>
public string SslKeyPassword { get; set; }

/// <summary>
/// Specifies whether to enable notification of delivery reports. Typically you should
/// set this parameter to true. Set it to false for "fire and forget" semantics and
/// a small boost in performance. default: true importance: low
/// </summary>
public bool EnableDeliveryReports { get; set; } = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
{
BootstrapServers = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.BrokerList),
BatchNumMessages = entity.Attribute.BatchSize,
EnableDeliveryReports = entity.Attribute.EnableDeliveryReports,
EnableIdempotence = entity.Attribute.EnableIdempotence,
MessageMaxBytes = entity.Attribute.MaxMessageBytes,
MessageSendMaxRetries = entity.Attribute.MaxRetries,
MessageTimeoutMs = entity.Attribute.MessageTimeoutMs,
RequestTimeoutMs = entity.Attribute.RequestTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,5 +304,37 @@ public void GetProducerConfig_When_Ssl_Locations_Resolve_InAzure_Should_Contain_
Assert.Equal(sslCa.FullName, config.SslCaLocation);
Assert.Equal(sslKeyLocation.FullName, config.SslKeyLocation);
}

[Fact]
public void GetProducerConfig_Copies_Properties_From_Attribute()
{
var attribute = new KafkaAttribute("brokers:9092", "myTopic")
{
EnableDeliveryReports = false,
BatchSize = 123,
EnableIdempotence = true,
MaxMessageBytes = 234,
MaxRetries = 345,
MessageTimeoutMs = 456,
RequestTimeoutMs = 567
};

var entity = new KafkaProducerEntity
{
Attribute = attribute
};

var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance);
var config = factory.GetProducerConfig(entity);

Assert.Equal(attribute.EnableDeliveryReports, config.EnableDeliveryReports);
Assert.Equal(attribute.BatchSize, config.BatchNumMessages);
Assert.Equal(attribute.EnableIdempotence, config.EnableIdempotence);
Assert.Equal(attribute.MaxMessageBytes, config.MessageMaxBytes);
Assert.Equal(attribute.MaxRetries, config.MessageSendMaxRetries);
Assert.Equal(attribute.MessageTimeoutMs, config.MessageTimeoutMs);
Assert.Equal(attribute.RequestTimeoutMs, config.RequestTimeoutMs);
Assert.Equal(attribute.EnableDeliveryReports, config.EnableDeliveryReports);
}
}
}

0 comments on commit 58f134e

Please sign in to comment.