diff --git a/README.md b/README.md index b9f71dbf..6a579416 100644 --- a/README.md +++ b/README.md @@ -322,6 +322,7 @@ The settings exposed here are targeted to more advanced users that want to custo |LibkafkaDebug|debug|Both |MetadataMaxAgeMs|metadata.max.age.ms|Both |SocketKeepaliveEnable|socket.keepalive.enable|Both +|ClientId|client.id|Both **NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187). diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs index 3511fd36..af6a859a 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Net; using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; @@ -166,6 +167,8 @@ private ConsumerConfig GetConsumerConfiguration() // start from earliest if no checkpoint has been committed AutoOffsetReset = AutoOffsetReset.Earliest, + ClientId = this.listenerConfiguration.ClientId ?? Dns.GetHostName(), + // Secure communication/authentication SaslMechanism = this.listenerConfiguration.SaslMechanism, SaslUsername = this.listenerConfiguration.SaslUsername, diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index 74cc80b0..1747b376 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -139,5 +139,12 @@ public KafkaAttribute() /// ssl.key.password in librdkafka /// public string SslKeyPassword { get; set; } + + /// + /// Client identifier. + /// Default: the hostname of the client machine + /// client.id in librdkafka + /// + public string ClientId { get; set; } } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs index 178f3ffc..3a8280ce 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Concurrent; +using System.Net; using System.Reflection; using System.Text; using Avro.Generic; @@ -130,7 +131,8 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity) SslCaLocation = resolvedSslCaLocation, Debug = kafkaOptions?.LibkafkaDebug, MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs, - SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable + SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable, + ClientId = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.ClientId) ?? Dns.GetHostName() }; if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaListenerConfiguration.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaListenerConfiguration.cs index 63e69cfe..7851519e 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaListenerConfiguration.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaListenerConfiguration.cs @@ -94,6 +94,13 @@ public class KafkaListenerConfiguration /// public string SslKeyPassword { get; set; } + /// + /// Client identifier. + /// Default: the hostname of the client machine + /// client.id in librdkafka + /// + public string ClientId { get; set; } + internal void ApplyToConfig(ClientConfig conf) { if (this.SaslMechanism.HasValue) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs index e8cb85cf..56f7b074 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs @@ -107,6 +107,12 @@ public KafkaTriggerAttribute(string brokerList, string topic) /// public string SslKeyPassword { get; set; } + /// + /// Client identifier. + /// Default: the hostname of the client machine + /// client.id in librdkafka + /// + public string ClientId { get; set; } bool IsValidValueType(Type value) { diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs index 22218234..b8f27104 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs @@ -92,6 +92,7 @@ private KafkaListenerConfiguration CreateConsumerConfiguration(KafkaTriggerAttri ConsumerGroup = this.config.ResolveSecureSetting(nameResolver, attribute.ConsumerGroup), Topic = this.config.ResolveSecureSetting(nameResolver, attribute.Topic), EventHubConnectionString = this.config.ResolveSecureSetting(nameResolver, attribute.EventHubConnectionString), + ClientId = this.config.ResolveSecureSetting(nameResolver, attribute.ClientId) }; if (attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet || diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaListenerTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaListenerTest.cs index 804f47d7..e492a014 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaListenerTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaListenerTest.cs @@ -10,6 +10,7 @@ using System.Collections.Concurrent; using System.IO; using System.Linq; +using System.Net; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -342,7 +343,7 @@ public async Task When_Options_Are_Set_Should_Be_Set_In_Consumer_Config() await target.StartAsync(default); - Assert.Equal(12, target.ConsumerConfig.Count()); + Assert.Equal(13, target.ConsumerConfig.Count()); Assert.Equal("testBroker", target.ConsumerConfig.BootstrapServers); Assert.Equal("group1", target.ConsumerConfig.GroupId); Assert.Equal("password1", target.ConsumerConfig.SslKeyPassword); @@ -355,10 +356,45 @@ public async Task When_Options_Are_Set_Should_Be_Set_In_Consumer_Config() Assert.Equal(180000, target.ConsumerConfig.MetadataMaxAgeMs); Assert.Equal(true, target.ConsumerConfig.SocketKeepaliveEnable); Assert.Equal(AutoOffsetReset.Earliest, target.ConsumerConfig.AutoOffsetReset); + Assert.Equal(Dns.GetHostName(), target.ConsumerConfig.ClientId); await target.StopAsync(default); } + [Fact] + public async Task When_ClientId_Is_Set_Should_Be_Set_In_Consumer_Config() + { + var executor = new Mock(); + var consumer = new Mock>(); + + var listenerConfig = new KafkaListenerConfiguration() + { + BrokerList = "testBroker", + Topic = "topic", + ClientId = "testclientid" + }; + + var kafkaOptions = new KafkaOptions(); + var target = new KafkaListenerForTest( + executor.Object, + true, + kafkaOptions, + listenerConfig, + requiresKey: true, + valueDeserializer: null, + NullLogger.Instance, + functionId: "testId" + ); + + target.SetConsumer(consumer.Object); + + await target.StartAsync(default); + + Assert.Equal("testclientid", target.ConsumerConfig.ClientId); + + await target.StopAsync(default); + } + [Fact] public async Task When_Options_With_Ssal_Are_Set_Should_Be_Set_In_Consumer_Config() { @@ -392,7 +428,7 @@ public async Task When_Options_With_Ssal_Are_Set_Should_Be_Set_In_Consumer_Confi await target.StartAsync(default); - Assert.Equal(12, target.ConsumerConfig.Count()); + Assert.Equal(13, target.ConsumerConfig.Count()); Assert.Equal("testBroker", target.ConsumerConfig.BootstrapServers); Assert.Equal("group1", target.ConsumerConfig.GroupId); Assert.Equal(kafkaOptions.AutoCommitIntervalMs, target.ConsumerConfig.AutoCommitIntervalMs); diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs index a0221620..f93f5033 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Net; using System.Reflection; using Avro.Generic; @@ -304,5 +305,54 @@ public void GetProducerConfig_When_Ssl_Locations_Resolve_InAzure_Should_Contain_ Assert.Equal(sslCa.FullName, config.SslCaLocation); Assert.Equal(sslKeyLocation.FullName, config.SslKeyLocation); } + + [Fact] + public void GetProducerConfig_When_Client_Defined_Should_SetFromConfig() + { + var attribute = new KafkaAttribute("brokers:9092", "myTopic") + { + AuthenticationMode = BrokerAuthenticationMode.Plain, + Protocol = BrokerProtocol.SaslSsl, + Username = "myuser", + Password = "secret", + ClientId = "%clientid%" + }; + + var entity = new KafkaProducerEntity() + { + Attribute = attribute, + ValueType = typeof(ProtoUser), + }; + + var configuration = new ConfigurationBuilder().AddInMemoryCollection(new Dictionary { + ["clientid"] = "SpecifiedClientId" + }).Build(); + + var factory = new KafkaProducerFactory(configuration, new DefaultNameResolver(configuration), NullLoggerProvider.Instance); + var config = factory.GetProducerConfig(entity); + Assert.Equal("SpecifiedClientId", config.ClientId); + } + + [Fact] + public void GetProducerConfig_When_Client_Not_Defined_Should_SetHostName() + { + var attribute = new KafkaAttribute("brokers:9092", "myTopic") + { + AuthenticationMode = BrokerAuthenticationMode.Plain, + Protocol = BrokerProtocol.SaslSsl, + Username = "myuser", + Password = "secret" + }; + + var entity = new KafkaProducerEntity() + { + Attribute = attribute, + ValueType = typeof(ProtoUser), + }; + + var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance); + var config = factory.GetProducerConfig(entity); + Assert.Equal(Dns.GetHostName(), config.ClientId); + } } } \ No newline at end of file