From 0f120608d6b23ded54a5c5041bf34141cf8592a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20M=C3=BCller?= Date: Fri, 22 Sep 2017 19:10:02 +0200 Subject: [PATCH] update to new schema registry version --- flink-serialization.iml | 52 +++++++++----- pom.xml | 8 +-- .../AvroBuilderKeyedSerializationSchema.java | 69 +++++++++++++------ .../serialization/GenericKeyValueRecord.java | 28 ++++++-- .../serialization/KafkaAvroKeyEncoder.java | 28 -------- .../serialization/KafkaAvroValueEncoder.java | 28 -------- ...emaRegistryKeyedDeserializationSchema.java | 63 ++++++++++++----- ...chemaRegistryKeyedSerializationSchema.java | 55 --------------- 8 files changed, 153 insertions(+), 178 deletions(-) delete mode 100644 src/main/java/com/github/smueller18/flink/serialization/KafkaAvroKeyEncoder.java delete mode 100644 src/main/java/com/github/smueller18/flink/serialization/KafkaAvroValueEncoder.java delete mode 100644 src/main/java/com/github/smueller18/flink/serialization/SchemaRegistryKeyedSerializationSchema.java diff --git a/flink-serialization.iml b/flink-serialization.iml index 5da2840..73f3a8d 100644 --- a/flink-serialization.iml +++ b/flink-serialization.iml @@ -12,6 +12,19 @@ + + + + + + + + + + + + + @@ -31,19 +44,19 @@ - - - - - - - - + + + + + + + + - + @@ -67,15 +80,16 @@ - - - - - - - - - - + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 1943907..a96d1d3 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.smueller18 flink-serialization - 0.8 + 0.10.0 jar Flink-Serialization @@ -15,10 +15,10 @@ 0.7 0.10.2.1 - 1.8.1 - 1.3.0 + 1.8.2 + 1.3.2 2.11 - 3.2.1 + 3.3.0 diff --git a/src/main/java/com/github/smueller18/flink/serialization/AvroBuilderKeyedSerializationSchema.java b/src/main/java/com/github/smueller18/flink/serialization/AvroBuilderKeyedSerializationSchema.java index 7545a76..a7f353b 100644 --- a/src/main/java/com/github/smueller18/flink/serialization/AvroBuilderKeyedSerializationSchema.java +++ b/src/main/java/com/github/smueller18/flink/serialization/AvroBuilderKeyedSerializationSchema.java @@ -1,55 +1,80 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2017 Stephan Müller + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + + package com.github.smueller18.flink.serialization; +import java.util.Map; + import com.github.smueller18.avro.builder.AvroBuilder; -import kafka.utils.VerifiableProperties; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import io.confluent.kafka.serializers.KafkaAvroSerializer; -import java.util.Properties; -/** - * Copyright 2017 Stephan Müller - * License: MIT - */ public class AvroBuilderKeyedSerializationSchema implements KeyedSerializationSchema { - private Properties props; + private Map configs; - private transient KafkaAvroKeyEncoder keyEncoder; - private transient KafkaAvroValueEncoder valueEncoder; + private transient KafkaAvroSerializer keySerializer; + private transient KafkaAvroSerializer valueSerializer; /*** * - * @param props properties for {@link KafkaAvroKeyEncoder} and {@link KafkaAvroValueEncoder} + * @param configs key-value-pairs for {@link KafkaAvroSerializer} * schema.registry.url ({@link String}): * Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas * max.schemas.per.subject ({@link Integer}, default: 1000): * Maximum number of schemas to create or cache locally * */ - public AvroBuilderKeyedSerializationSchema(Properties props) { - this.props = props; + public AvroBuilderKeyedSerializationSchema(Map configs) { + this.configs = configs; } @Override - public byte[] serializeKey(T avroBuilder) { + public byte[] serializeKey(T avroBuilderObject) { - if (this.keyEncoder == null) - this.keyEncoder = new KafkaAvroKeyEncoder(new VerifiableProperties(this.props)); + if (this.valueSerializer == null) { + this.valueSerializer = new KafkaAvroSerializer(); + this.valueSerializer.configure(this.configs, true); + } - return this.keyEncoder.toBytes(avroBuilder.getKeyRecord()); + return this.keySerializer.serialize(AvroBuilder.getTopicName(avroBuilderObject.getClass()), avroBuilderObject.getKeyRecord()); } @Override - public byte[] serializeValue(T avroBuilder) { - - if (this.valueEncoder == null) - this.valueEncoder = new KafkaAvroValueEncoder(new VerifiableProperties(this.props)); + public byte[] serializeValue(T avroBuilderObject) { - return this.valueEncoder.toBytes(avroBuilder.getValueRecord()); + if (this.keySerializer == null) { + this.keySerializer = new KafkaAvroSerializer(); + this.keySerializer.configure(this.configs, false); + } + return this.valueSerializer.serialize(AvroBuilder.getTopicName(avroBuilderObject.getClass()), avroBuilderObject.getValueRecord()); } @Override - public String getTargetTopic(T avroBuilder) { + public String getTargetTopic(T avroBuilderObject) { return null; } diff --git a/src/main/java/com/github/smueller18/flink/serialization/GenericKeyValueRecord.java b/src/main/java/com/github/smueller18/flink/serialization/GenericKeyValueRecord.java index aa5682c..65770ff 100644 --- a/src/main/java/com/github/smueller18/flink/serialization/GenericKeyValueRecord.java +++ b/src/main/java/com/github/smueller18/flink/serialization/GenericKeyValueRecord.java @@ -1,11 +1,31 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2017 Stephan Müller + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.github.smueller18.flink.serialization; import org.apache.avro.generic.GenericRecord; -/** - * Copyright 2017 Stephan Müller - * License: MIT - */ public class GenericKeyValueRecord { diff --git a/src/main/java/com/github/smueller18/flink/serialization/KafkaAvroKeyEncoder.java b/src/main/java/com/github/smueller18/flink/serialization/KafkaAvroKeyEncoder.java deleted file mode 100644 index a1fdf04..0000000 --- a/src/main/java/com/github/smueller18/flink/serialization/KafkaAvroKeyEncoder.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.github.smueller18.flink.serialization; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer; -import org.apache.avro.generic.GenericContainer; -import org.apache.kafka.common.errors.SerializationException; - -import kafka.serializer.Encoder; -import kafka.utils.VerifiableProperties; - -/** - * Copyright 2017 Stephan Müller - * License: MIT - */ - -public class KafkaAvroKeyEncoder extends AbstractKafkaAvroSerializer implements Encoder { - - public KafkaAvroKeyEncoder(VerifiableProperties props) { - configure(serializerConfig(props)); - } - - @Override - public byte[] toBytes(Object object) { - if (object instanceof GenericContainer) - return serializeImpl(getSubjectName(((GenericContainer) object).getSchema().getFullName(), true), object); - else - throw new SerializationException("Primitive types are not supported yet"); - } -} diff --git a/src/main/java/com/github/smueller18/flink/serialization/KafkaAvroValueEncoder.java b/src/main/java/com/github/smueller18/flink/serialization/KafkaAvroValueEncoder.java deleted file mode 100644 index 7eac537..0000000 --- a/src/main/java/com/github/smueller18/flink/serialization/KafkaAvroValueEncoder.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.github.smueller18.flink.serialization; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer; -import org.apache.avro.generic.GenericContainer; -import org.apache.kafka.common.errors.SerializationException; - -import kafka.serializer.Encoder; -import kafka.utils.VerifiableProperties; - -/** - * Copyright 2017 Stephan Müller - * License: MIT - */ - -public class KafkaAvroValueEncoder extends AbstractKafkaAvroSerializer implements Encoder { - - public KafkaAvroValueEncoder(VerifiableProperties props) { - configure(serializerConfig(props)); - } - - @Override - public byte[] toBytes(Object object) { - if (object instanceof GenericContainer) - return serializeImpl(getSubjectName(((GenericContainer) object).getSchema().getFullName(), false), object); - else - throw new SerializationException("Primitive types are not supported yet"); - } -} diff --git a/src/main/java/com/github/smueller18/flink/serialization/SchemaRegistryKeyedDeserializationSchema.java b/src/main/java/com/github/smueller18/flink/serialization/SchemaRegistryKeyedDeserializationSchema.java index bb6f94d..5071d04 100644 --- a/src/main/java/com/github/smueller18/flink/serialization/SchemaRegistryKeyedDeserializationSchema.java +++ b/src/main/java/com/github/smueller18/flink/serialization/SchemaRegistryKeyedDeserializationSchema.java @@ -1,58 +1,85 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2017 Stephan Müller + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + + package com.github.smueller18.flink.serialization; -import io.confluent.kafka.serializers.KafkaAvroDecoder; -import kafka.utils.VerifiableProperties; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.kafka.common.errors.SerializationException; import java.io.IOException; import java.net.ConnectException; -import java.util.Properties; +import java.util.Map; -/** - * Copyright 2017 Stephan Müller - * License: MIT - */ public class SchemaRegistryKeyedDeserializationSchema implements KeyedDeserializationSchema { - private Properties props; - private transient KafkaAvroDecoder decoder; + private Map configs; + private transient KafkaAvroDeserializer keyDeserializer; + private transient KafkaAvroDeserializer valueDeserializer; /*** * - * @param props properties for {@link KafkaAvroDecoder} + * @param configs key-value-pairs for {@link KafkaAvroDeserializer} * schema.registry.url ({@link String}): * Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas * max.schemas.per.subject ({@link Integer}, default: 1000): * Maximum number of schemas to create or cache locally * */ - public SchemaRegistryKeyedDeserializationSchema(Properties props) { - this.props = props; + public SchemaRegistryKeyedDeserializationSchema(Map configs) { + this.configs = configs; } @Override public GenericKeyValueRecord deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - if(decoder == null) - decoder = new KafkaAvroDecoder(new VerifiableProperties(this.props)); + if(this.keyDeserializer == null) { + this.keyDeserializer = new KafkaAvroDeserializer(); + this.keyDeserializer.configure(this.configs, true); + } + + if(this.valueDeserializer == null) { + this.valueDeserializer = new KafkaAvroDeserializer(); + this.valueDeserializer.configure(this.configs, false); + } try { return new GenericKeyValueRecord( - (GenericRecord) decoder.fromBytes(messageKey), - (GenericRecord) decoder.fromBytes(message) + (GenericRecord) keyDeserializer.deserialize(null, messageKey), + (GenericRecord) valueDeserializer.deserialize(null, message) ); } catch(Exception e) { if (e.getCause() instanceof ConnectException) throw new ConnectException( String.format("Connection to schema registry '%s' could not be established.", - this.props.getProperty("schema.registry.url") + this.configs.get("schema.registry.url") ) ); else throw e; diff --git a/src/main/java/com/github/smueller18/flink/serialization/SchemaRegistryKeyedSerializationSchema.java b/src/main/java/com/github/smueller18/flink/serialization/SchemaRegistryKeyedSerializationSchema.java deleted file mode 100644 index 22c7923..0000000 --- a/src/main/java/com/github/smueller18/flink/serialization/SchemaRegistryKeyedSerializationSchema.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.github.smueller18.flink.serialization; - -import kafka.utils.VerifiableProperties; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; - -import java.util.Properties; - -/** - * Copyright 2017 Stephan Müller - * License: MIT - */ -public class SchemaRegistryKeyedSerializationSchema implements KeyedSerializationSchema { - - private Properties props; - - private transient KafkaAvroKeyEncoder keyEncoder; - private transient KafkaAvroValueEncoder valueEncoder; - - /*** - * - * @param props properties for {@link KafkaAvroKeyEncoder} and {@link KafkaAvroValueEncoder} - * schema.registry.url ({@link String}): - * Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas - * max.schemas.per.subject ({@link Integer}, default: 1000): - * Maximum number of schemas to create or cache locally - * - */ - public SchemaRegistryKeyedSerializationSchema(Properties props) { - this.props = props; - } - - @Override - public byte[] serializeKey(GenericKeyValueRecord genericKeyValueRecord) { - - if (this.keyEncoder == null) - this.keyEncoder = new KafkaAvroKeyEncoder(new VerifiableProperties(this.props)); - - return this.keyEncoder.toBytes(genericKeyValueRecord); - } - - @Override - public byte[] serializeValue(GenericKeyValueRecord genericKeyValueRecord) { - - if (this.valueEncoder == null) - this.valueEncoder = new KafkaAvroValueEncoder(new VerifiableProperties(this.props)); - - return this.valueEncoder.toBytes(genericKeyValueRecord); - } - - @Override - public String getTargetTopic(GenericKeyValueRecord genericKeyValueRecord) { - return null; - } - -}