Skip to content

Commit

Permalink
fix serializeKey function
Browse files Browse the repository at this point in the history
  • Loading branch information
smueller18 committed Sep 25, 2017
1 parent 0f12060 commit 830efc0
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.github.smueller18</groupId>
<artifactId>flink-serialization</artifactId>
<version>0.10.0</version>
<version>0.10.1</version>
<packaging>jar</packaging>

<name>Flink-Serialization</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public AvroBuilderKeyedSerializationSchema(Map<String, ?> configs) {
@Override
public byte[] serializeKey(T avroBuilderObject) {

if (this.valueSerializer == null) {
this.valueSerializer = new KafkaAvroSerializer();
this.valueSerializer.configure(this.configs, true);
if (this.keySerializer == null) {
this.keySerializer = new KafkaAvroSerializer();
this.keySerializer.configure(this.configs, true);
}

return this.keySerializer.serialize(AvroBuilder.getTopicName(avroBuilderObject.getClass()), avroBuilderObject.getKeyRecord());
Expand All @@ -66,9 +66,9 @@ public byte[] serializeKey(T avroBuilderObject) {
@Override
public byte[] serializeValue(T avroBuilderObject) {

if (this.keySerializer == null) {
this.keySerializer = new KafkaAvroSerializer();
this.keySerializer.configure(this.configs, false);
if (this.valueSerializer == null) {
this.valueSerializer = new KafkaAvroSerializer();
this.valueSerializer.configure(this.configs, false);
}
return this.valueSerializer.serialize(AvroBuilder.getTopicName(avroBuilderObject.getClass()), avroBuilderObject.getValueRecord());
}
Expand Down

0 comments on commit 830efc0

Please sign in to comment.