Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing event key that implements ISpecificRecord #440

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ internal class KafkaListener<TKey, TValue> : IListener, IScaleMonitorProvider
/// <value>The value deserializer.</value>
internal IDeserializer<TValue> ValueDeserializer { get; }

/// <summary>
/// Gets the Key deserializer
/// </summary>
/// <value>The key deserializer.</value>
internal IDeserializer<TKey> KeyDeserializer { get; }

public KafkaListener(
ITriggeredFunctionExecutor executor,
bool singleDispatch,
Expand All @@ -63,9 +69,11 @@ public KafkaListener(
bool requiresKey,
IDeserializer<TValue> valueDeserializer,
ILogger logger,
string functionId)
string functionId,
IDeserializer<TKey> keyDeserializer)
{
this.ValueDeserializer = valueDeserializer;
this.KeyDeserializer = keyDeserializer;
this.executor = executor;
this.singleDispatch = singleDispatch;
this.options = options;
Expand Down Expand Up @@ -102,6 +110,7 @@ private IConsumer<TKey, TValue> CreateConsumer()
if (ValueDeserializer != null)
{
builder.SetValueDeserializer(ValueDeserializer);
builder.SetKeyDeserializer(KeyDeserializer);
}

builder.SetLogHandler((_, m) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Task<IBinding> TryCreateAsync(BindingProviderContext context)
argumentBinding,
keyAndValueTypes.KeyType,
keyAndValueTypes.ValueType,
keyAndValueTypes.AvroSchema,
keyAndValueTypes.ValueAvroSchema,
this.config,
this.nameResolver);
return Task.FromResult<IBinding>(binding);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
internal static class SerializationHelper
{
internal static object ResolveValueDeserializer(Type valueType, string specifiedAvroSchema)
internal static object ResolveDeserializer(Type valueType, string specifiedAvroSchema)
{
if (typeof(Google.Protobuf.IMessage).IsAssignableFrom(valueType))
{
Expand Down Expand Up @@ -84,7 +84,8 @@ internal class GetKeyAndValueTypesResult
public Type KeyType { get; set; }
public bool RequiresKey { get; set; }
public Type ValueType { get; set; }
public string AvroSchema { get; set; }
public string KeyAvroSchema { get; set; }
public string ValueAvroSchema { get; set; }
}

/// <summary>
Expand All @@ -94,6 +95,7 @@ internal class GetKeyAndValueTypesResult
internal static GetKeyAndValueTypesResult GetKeyAndValueTypes(string avroSchemaFromAttribute, Type parameterType, Type defaultKeyType)
{
string avroSchema = null;
string keyAvroSchema = null;
var requiresKey = false;

var valueType = parameterType;
Expand Down Expand Up @@ -128,6 +130,12 @@ internal static GetKeyAndValueTypesResult GetKeyAndValueTypes(string avroSchemaF
}
}

if (typeof(ISpecificRecord).IsAssignableFrom(keyType))
{
var specificRecord = (ISpecificRecord)Activator.CreateInstance(keyType);
keyAvroSchema = specificRecord.Schema.ToString();
}

if (typeof(ISpecificRecord).IsAssignableFrom(valueType))
{
var specificRecord = (ISpecificRecord)Activator.CreateInstance(valueType);
Expand All @@ -144,8 +152,9 @@ internal static GetKeyAndValueTypesResult GetKeyAndValueTypes(string avroSchemaF
{
KeyType = keyType,
ValueType = valueType,
AvroSchema = avroSchema,
ValueAvroSchema = avroSchema,
RequiresKey = requiresKey,
KeyAvroSchema = keyAvroSchema
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,21 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
var consumerConfig = CreateConsumerConfiguration(attribute);

var keyAndValueTypes = SerializationHelper.GetKeyAndValueTypes(attribute.AvroSchema, parameter.ParameterType, typeof(string));
var valueDeserializer = SerializationHelper.ResolveValueDeserializer(keyAndValueTypes.ValueType, keyAndValueTypes.AvroSchema);

var binding = CreateBindingStrategyFor(keyAndValueTypes.KeyType ?? typeof(Ignore), keyAndValueTypes.ValueType, keyAndValueTypes.RequiresKey, valueDeserializer, parameter, consumerConfig);
var valueDeserializer = SerializationHelper.ResolveDeserializer(keyAndValueTypes.ValueType, keyAndValueTypes.ValueAvroSchema);
var keyDeserializer = SerializationHelper.ResolveDeserializer(keyAndValueTypes.KeyType, keyAndValueTypes.KeyAvroSchema);

var binding = CreateBindingStrategyFor(keyAndValueTypes.KeyType ?? typeof(Ignore), keyAndValueTypes.ValueType, keyAndValueTypes.RequiresKey, valueDeserializer, parameter, consumerConfig, keyDeserializer);
return Task.FromResult<ITriggerBinding>(new KafkaTriggerBindingWrapper(binding));
}

ITriggerBinding CreateBindingStrategyFor(Type keyType, Type valueType, bool requiresKey, object valueDeserializer, ParameterInfo parameterInfo, KafkaListenerConfiguration listenerConfiguration)
ITriggerBinding CreateBindingStrategyFor(Type keyType, Type valueType, bool requiresKey, object valueDeserializer, ParameterInfo parameterInfo, KafkaListenerConfiguration listenerConfiguration, object keyDeserializer)
{
var genericCreateBindingStrategy = this.GetType().GetMethod(nameof(CreateBindingStrategy), BindingFlags.Instance | BindingFlags.NonPublic).MakeGenericMethod(keyType, valueType);
return (ITriggerBinding)genericCreateBindingStrategy.Invoke(this, new object[] { parameterInfo, listenerConfiguration, requiresKey, valueDeserializer });
return (ITriggerBinding)genericCreateBindingStrategy.Invoke(this, new object[] { parameterInfo, listenerConfiguration, requiresKey, valueDeserializer, keyDeserializer });
}

private ITriggerBinding CreateBindingStrategy<TKey, TValue>(ParameterInfo parameter, KafkaListenerConfiguration listenerConfiguration, bool requiresKey, IDeserializer<TValue> valueDeserializer)
private ITriggerBinding CreateBindingStrategy<TKey, TValue>(ParameterInfo parameter, KafkaListenerConfiguration listenerConfiguration, bool requiresKey, IDeserializer<TValue> valueDeserializer, IDeserializer<TKey> keyDeserializer)
{
// TODO: reuse connections if they match with others in same function app
Task<IListener> listenerCreator(ListenerFactoryContext factoryContext, bool singleDispatch)
Expand All @@ -76,7 +78,8 @@ Task<IListener> listenerCreator(ListenerFactoryContext factoryContext, bool sing
requiresKey,
valueDeserializer,
this.logger,
factoryContext.Descriptor.Id);
factoryContext.Descriptor.Id,
keyDeserializer);

return Task.FromResult<IListener>(listener);
}
Expand Down