diff --git a/pom.xml b/pom.xml index a96d1d3..3298415 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.smueller18 flink-serialization - 0.10.0 + 0.10.1 jar Flink-Serialization diff --git a/src/main/java/com/github/smueller18/flink/serialization/AvroBuilderKeyedSerializationSchema.java b/src/main/java/com/github/smueller18/flink/serialization/AvroBuilderKeyedSerializationSchema.java index a7f353b..d3dc8ee 100644 --- a/src/main/java/com/github/smueller18/flink/serialization/AvroBuilderKeyedSerializationSchema.java +++ b/src/main/java/com/github/smueller18/flink/serialization/AvroBuilderKeyedSerializationSchema.java @@ -55,9 +55,9 @@ public AvroBuilderKeyedSerializationSchema(Map configs) { @Override public byte[] serializeKey(T avroBuilderObject) { - if (this.valueSerializer == null) { - this.valueSerializer = new KafkaAvroSerializer(); - this.valueSerializer.configure(this.configs, true); + if (this.keySerializer == null) { + this.keySerializer = new KafkaAvroSerializer(); + this.keySerializer.configure(this.configs, true); } return this.keySerializer.serialize(AvroBuilder.getTopicName(avroBuilderObject.getClass()), avroBuilderObject.getKeyRecord()); @@ -66,9 +66,9 @@ public byte[] serializeKey(T avroBuilderObject) { @Override public byte[] serializeValue(T avroBuilderObject) { - if (this.keySerializer == null) { - this.keySerializer = new KafkaAvroSerializer(); - this.keySerializer.configure(this.configs, false); + if (this.valueSerializer == null) { + this.valueSerializer = new KafkaAvroSerializer(); + this.valueSerializer.configure(this.configs, false); } return this.valueSerializer.serialize(AvroBuilder.getTopicName(avroBuilderObject.getClass()), avroBuilderObject.getValueRecord()); }