diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/IKafkaEventData.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/IKafkaEventData.cs index 292b1961..43057367 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/IKafkaEventData.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/IKafkaEventData.cs @@ -12,6 +12,6 @@ public interface IKafkaEventData long Offset { get; } int Partition { get; } string Topic { get; } - DateTime Timestamp { get; } + DateTimeOffset Timestamp { get; } } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/KafkaEventData.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/KafkaEventData.cs index 847a6807..e39149af 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/KafkaEventData.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/KafkaEventData.cs @@ -12,7 +12,7 @@ public class KafkaEventData : IKafkaEventData public long Offset { get; set; } public int Partition { get; set; } public string Topic { get; set; } - public DateTime Timestamp { get; set; } + public DateTimeOffset Timestamp { get; set; } public TValue Value { get; set; } object IKafkaEventData.Value => this.Value; @@ -35,7 +35,7 @@ public KafkaEventData(ConsumeResult consumeResult) this.Value = consumeResult.Value; this.Offset = consumeResult.Offset; this.Partition = consumeResult.Partition; - this.Timestamp = consumeResult.Timestamp.UtcDateTime; + this.Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(consumeResult.Message.Timestamp.UnixTimestampMs); this.Topic = consumeResult.Topic; } } @@ -46,7 +46,7 @@ public class KafkaEventData : IKafkaEventData public long Offset { get; set; } public int Partition { get; set; } public string Topic { get; set; } - public DateTime Timestamp { get; set; } + public DateTimeOffset Timestamp { get; set; } public TValue Value { get; set; } object IKafkaEventData.Value => this.Value; @@ -69,7 +69,7 @@ internal static KafkaEventData CreateFrom(ConsumeResult GetBindingContract(bool isSingleDispatch = true) AddBindingContractMember(contract, nameof(KafkaEventData.Key), typeof(object), isSingleDispatch); AddBindingContractMember(contract, nameof(KafkaEventData.Partition), typeof(int), isSingleDispatch); AddBindingContractMember(contract, nameof(KafkaEventData.Topic), typeof(string), isSingleDispatch); - AddBindingContractMember(contract, nameof(KafkaEventData.Timestamp), typeof(DateTime), isSingleDispatch); + AddBindingContractMember(contract, nameof(KafkaEventData.Timestamp), typeof(DateTimeOffset), isSingleDispatch); AddBindingContractMember(contract, nameof(KafkaEventData.Offset), typeof(long), isSingleDispatch); return contract; @@ -86,7 +86,7 @@ internal static void AddBindingData(Dictionary bindingData, IKaf int length = events.Length; var partitions = new int[length]; var offsets = new long[length]; - var timestamps = new DateTime[length]; + var timestamps = new DateTimeOffset[length]; var topics = new string[length]; var keys = new object[length]; diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerBindingStrategyTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerBindingStrategyTest.cs index 2be4ef39..5f465895 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerBindingStrategyTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerBindingStrategyTest.cs @@ -19,7 +19,7 @@ public void GetStaticBindingContract_ReturnsExpectedValue() Assert.Equal(typeof(object), contract["Key"]); Assert.Equal(typeof(int), contract["Partition"]); Assert.Equal(typeof(string), contract["Topic"]); - Assert.Equal(typeof(DateTime), contract["Timestamp"]); + Assert.Equal(typeof(DateTimeOffset), contract["Timestamp"]); Assert.Equal(typeof(long), contract["Offset"]); } @@ -33,7 +33,7 @@ public void GetBindingContract_SingleDispatch_ReturnsExpectedValue() Assert.Equal(typeof(object), contract["Key"]); Assert.Equal(typeof(int), contract["Partition"]); Assert.Equal(typeof(string), contract["Topic"]); - Assert.Equal(typeof(DateTime), contract["Timestamp"]); + Assert.Equal(typeof(DateTimeOffset), contract["Timestamp"]); Assert.Equal(typeof(long), contract["Offset"]); } @@ -45,7 +45,7 @@ public void SingleDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event() Key = "1", Offset = 100, Partition = 2, - Timestamp = new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), + Timestamp = new DateTimeOffset(2019, 1, 10, 9, 21, 0, TimeSpan.Zero), Topic = "myTopic", Value = "Nothing", }; @@ -55,14 +55,14 @@ public void SingleDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event() Assert.Equal("1", binding["Key"]); Assert.Equal(100L, binding["Offset"]); Assert.Equal(2, binding["Partition"]); - Assert.Equal(new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), binding["Timestamp"]); + Assert.Equal(new DateTimeOffset(2019, 1, 10, 9, 21, 0, TimeSpan.Zero), binding["Timestamp"]); Assert.Equal("myTopic", binding["Topic"]); // lower case too Assert.Equal("1", binding["key"]); Assert.Equal(100L, binding["offset"]); Assert.Equal(2, binding["partition"]); - Assert.Equal(new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), binding["timestamp"]); + Assert.Equal(new DateTimeOffset(2019, 1, 10, 9, 21, 0, TimeSpan.Zero), binding["timestamp"]); Assert.Equal("myTopic", binding["topic"]); } @@ -76,7 +76,7 @@ public void MultiDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event() Key = "1", Offset = 100, Partition = 2, - Timestamp = new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), + Timestamp = new DateTimeOffset(2019, 1, 10, 9, 21, 0, TimeSpan.Zero), Topic = "myTopic", Value = "Nothing1", }, @@ -85,7 +85,7 @@ public void MultiDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event() Key = "2", Offset = 101, Partition = 2, - Timestamp = new DateTime(2019, 1, 10, 9, 21, 1, DateTimeKind.Utc), + Timestamp = new DateTimeOffset(2019, 1, 10, 9, 21, 1, TimeSpan.Zero), Topic = "myTopic", Value = "Nothing2", }, @@ -96,14 +96,14 @@ public void MultiDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event() Assert.Equal(new[] { "1", "2" }, binding["KeyArray"]); Assert.Equal(new[] { 100L, 101L }, binding["OffsetArray"]); Assert.Equal(new[] { 2, 2 }, binding["PartitionArray"]); - Assert.Equal(new[] { new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), new DateTime(2019, 1, 10, 9, 21, 1, DateTimeKind.Utc) }, binding["TimestampArray"]); + Assert.Equal(new[] { new DateTimeOffset(2019, 1, 10, 9, 21, 0, TimeSpan.Zero), new DateTimeOffset(2019, 1, 10, 9, 21, 1, TimeSpan.Zero) }, binding["TimestampArray"]); Assert.Equal(new[] { "myTopic", "myTopic" }, binding["TopicArray"]); // lower case too Assert.Equal(new[] { "1", "2" }, binding["keyArray"]); Assert.Equal(new[] { 100L, 101L }, binding["offsetArray"]); Assert.Equal(new[] { 2, 2 }, binding["partitionArray"]); - Assert.Equal(new[] { new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), new DateTime(2019, 1, 10, 9, 21, 1, DateTimeKind.Utc) }, binding["timestampArray"]); + Assert.Equal(new[] { new DateTimeOffset(2019, 1, 10, 9, 21, 0, TimeSpan.Zero), new DateTimeOffset(2019, 1, 10, 9, 21, 1, TimeSpan.Zero) }, binding["timestampArray"]); Assert.Equal(new[] { "myTopic", "myTopic" }, binding["topicArray"]); } }