Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Injectable topic scaler factory #206

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class KafkaExtensionConfigProvider : IExtensionConfigProvider
private readonly INameResolver nameResolver;
private readonly IWebJobsExtensionConfiguration<KafkaExtensionConfigProvider> configuration;
private readonly IKafkaProducerFactory kafkaProducerFactory;
private readonly IKafkaTopicScalerFactory kafkaTopicScalerFactory;
private readonly ILogger logger;

public KafkaExtensionConfigProvider(
Expand All @@ -38,7 +39,9 @@ public KafkaExtensionConfigProvider(
IConverterManager converterManager,
INameResolver nameResolver,
IWebJobsExtensionConfiguration<KafkaExtensionConfigProvider> configuration,
IKafkaProducerFactory kafkaProducerFactory)
IKafkaProducerFactory kafkaProducerFactory,
IKafkaTopicScalerFactory kafkaTopicScalerFactory
)
{
this.config = config;
this.options = options;
Expand All @@ -47,6 +50,7 @@ public KafkaExtensionConfigProvider(
this.nameResolver = nameResolver;
this.configuration = configuration;
this.kafkaProducerFactory = kafkaProducerFactory;
this.kafkaTopicScalerFactory = kafkaTopicScalerFactory;
this.logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Kafka"));
}

Expand All @@ -55,7 +59,7 @@ public void Initialize(ExtensionConfigContext context)
configuration.ConfigurationSection.Bind(options);

// register our trigger binding provider
var triggerBindingProvider = new KafkaTriggerAttributeBindingProvider(config, options, converterManager, nameResolver, loggerFactory);
var triggerBindingProvider = new KafkaTriggerAttributeBindingProvider(config, options, converterManager, nameResolver, kafkaTopicScalerFactory, loggerFactory);
context.AddBindingRule<KafkaTriggerAttribute>()
.BindToTrigger(triggerBindingProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public static IWebJobsBuilder AddKafka(this IWebJobsBuilder builder, Action<Kafk
});

builder.Services.AddSingleton<IKafkaProducerFactory, KafkaProducerFactory>();
builder.Services.AddSingleton<IKafkaTopicScalerFactory, DefaultKafkaTopicScalerFactory>();

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Confluent.Kafka;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
public class DefaultKafkaTopicScalerFactory : IKafkaTopicScalerFactory
{
public KafkaTopicScaler<TKey, TValue> CreateKafkaTopicScaler<TKey, TValue>(string topic, string consumerGroup, string functionId, IConsumer<TKey, TValue> consumer, AdminClientConfig adminClientConfig, ILogger logger)
{
return new KafkaTopicScaler<TKey, TValue>(topic, consumerGroup, functionId, consumer, adminClientConfig, logger);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Confluent.Kafka;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
public interface IKafkaTopicScalerFactory
{
KafkaTopicScaler<TKey, TValue> CreateKafkaTopicScaler<TKey, TValue>(string topic, string consumerGroup, string functionId, IConsumer<TKey, TValue> consumer, AdminClientConfig adminClientConfig, ILogger logger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ internal class KafkaListener<TKey, TValue> : IListener, IScaleMonitorProvider
private readonly KafkaListenerConfiguration listenerConfiguration;
// Indicates if the consumer requires the Kafka element key
private readonly bool requiresKey;
private readonly IKafkaTopicScalerFactory kafkaTopicScalerFactory;
private readonly ILogger logger;
private FunctionExecutorBase<TKey, TValue> functionExecutor;
private Lazy<IConsumer<TKey, TValue>> consumer;
Expand All @@ -62,10 +63,12 @@ public KafkaListener(
KafkaListenerConfiguration kafkaListenerConfiguration,
bool requiresKey,
IDeserializer<TValue> valueDeserializer,
IKafkaTopicScalerFactory kafkaTopicScalerFactory,
ILogger logger,
string functionId)
{
this.ValueDeserializer = valueDeserializer;
this.kafkaTopicScalerFactory = kafkaTopicScalerFactory;
this.executor = executor;
this.singleDispatch = singleDispatch;
this.options = options;
Expand Down Expand Up @@ -114,7 +117,7 @@ private IConsumer<TKey, TValue> CreateConsumer()

private KafkaTopicScaler<TKey, TValue> CreateTopicScaler()
{
return new KafkaTopicScaler<TKey, TValue>(this.listenerConfiguration.Topic, this.consumerGroup, this.functionId, this.consumer.Value, new AdminClientConfig(GetConsumerConfiguration()), this.logger);
return kafkaTopicScalerFactory.CreateKafkaTopicScaler(this.listenerConfiguration.Topic, this.consumerGroup, this.functionId, this.consumer.Value, new AdminClientConfig(GetConsumerConfiguration()), this.logger);
}

public void Cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public ScaleStatus GetScaleStatus(ScaleStatusContext context)
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.OfType<KafkaTriggerMetrics>().ToArray());
}

public ScaleStatus GetScaleStatus(ScaleStatusContext<KafkaTriggerMetrics> context)
public virtual ScaleStatus GetScaleStatus(ScaleStatusContext<KafkaTriggerMetrics> context)
{
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal class KafkaTriggerAttributeBindingProvider : ITriggerBindingProvider
private readonly IConfiguration config;
private readonly IConverterManager converterManager;
private readonly INameResolver nameResolver;
private readonly IKafkaTopicScalerFactory kafkaTopicScalerFactory;
private readonly IOptions<KafkaOptions> options;
private readonly ILogger logger;

Expand All @@ -29,11 +30,13 @@ public KafkaTriggerAttributeBindingProvider(
IOptions<KafkaOptions> options,
IConverterManager converterManager,
INameResolver nameResolver,
IKafkaTopicScalerFactory kafkaTopicScalerFactory,
ILoggerFactory loggerFactory)
{
this.config = config;
this.converterManager = converterManager;
this.nameResolver = nameResolver;
this.kafkaTopicScalerFactory = kafkaTopicScalerFactory;
this.options = options;
this.logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Kafka"));
}
Expand Down Expand Up @@ -75,6 +78,7 @@ Task<IListener> listenerCreator(ListenerFactoryContext factoryContext, bool sing
listenerConfiguration,
requiresKey,
valueDeserializer,
kafkaTopicScalerFactory,
this.logger,
factoryContext.Descriptor.Id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public KafkaListenerForTest(ITriggeredFunctionExecutor executor,
kafkaListenerConfiguration,
requiresKey,
valueDeserializer,
new DefaultKafkaTopicScalerFactory(),
logger,
functionId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public async Task When_No_Type_Is_Set_Should_Create_ByteArray_Listener(string fu
config,
Options.Create(new KafkaOptions()),
new KafkaEventDataConvertManager(NullLogger.Instance),
new DefaultNameResolver(config),
new DefaultNameResolver(config),
new DefaultKafkaTopicScalerFactory(),
NullLoggerFactory.Instance);

var parameterInfo = new TriggerBindingProviderContext(this.GetParameterInfo(functionName), default);
Expand Down Expand Up @@ -152,6 +153,7 @@ public async Task When_String_Value_Type_Is_Set_Should_Create_String_Listener(st
Options.Create(new KafkaOptions()),
new KafkaEventDataConvertManager(NullLogger.Instance),
new DefaultNameResolver(config),
new DefaultKafkaTopicScalerFactory(),
NullLoggerFactory.Instance);

var parameterInfo = new TriggerBindingProviderContext(this.GetParameterInfo(functionName), default);
Expand Down Expand Up @@ -189,6 +191,7 @@ public async Task When_Avro_Schema_Is_Provided_Should_Create_GenericRecord_Liste
Options.Create(new KafkaOptions()),
new KafkaEventDataConvertManager(NullLogger.Instance),
new DefaultNameResolver(config),
new DefaultKafkaTopicScalerFactory(),
NullLoggerFactory.Instance);

var parameterInfo = new TriggerBindingProviderContext(this.GetParameterInfo(functionName), default);
Expand Down Expand Up @@ -226,6 +229,7 @@ public async Task When_Value_Type_Is_Specific_Record_Should_Create_SpecificRecor
Options.Create(new KafkaOptions()),
new KafkaEventDataConvertManager(NullLogger.Instance),
new DefaultNameResolver(config),
new DefaultKafkaTopicScalerFactory(),
NullLoggerFactory.Instance);

var parameterInfo = new TriggerBindingProviderContext(this.GetParameterInfo(functionName), default);
Expand Down Expand Up @@ -263,6 +267,7 @@ public async Task When_Value_Type_Is_Protobuf_Should_Create_Protobuf_Listener(st
Options.Create(new KafkaOptions()),
new KafkaEventDataConvertManager(NullLogger.Instance),
new DefaultNameResolver(config),
new DefaultKafkaTopicScalerFactory(),
NullLoggerFactory.Instance);

var parameterInfo = new TriggerBindingProviderContext(this.GetParameterInfo(functionName), default);
Expand Down Expand Up @@ -312,6 +317,7 @@ public async Task When_Value_Is_KafkaEventData_With_Key_Should_Create_Listener_W
Options.Create(new KafkaOptions()),
new KafkaEventDataConvertManager(NullLogger.Instance),
new DefaultNameResolver(config),
new DefaultKafkaTopicScalerFactory(),
NullLoggerFactory.Instance);

var parameterInfo = new TriggerBindingProviderContext(this.GetParameterInfo(functionName), default);
Expand Down