Skip to content

Commit

Permalink
update to new schema registry version
Browse files Browse the repository at this point in the history
  • Loading branch information
smueller18 committed Sep 22, 2017
1 parent cef7616 commit 0f12060
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 178 deletions.
52 changes: 33 additions & 19 deletions flink-serialization.iml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: com.github.smueller18:avro-builder:0.4" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-connector-kafka-0.10_2.11:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-connector-kafka-0.9_2.11:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-connector-kafka-base_2.11:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:force-shading:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-java:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-core:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-annotations:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-metrics-core:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-shaded-hadoop2:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.junit.jupiter:junit-jupiter-api:5.0.0-M4" level="project" />
<orderEntry type="library" name="Maven: org.opentest4j:opentest4j:1.0.0-M2" level="project" />
<orderEntry type="library" name="Maven: org.junit.platform:junit-platform-commons:1.0.0-M4" level="project" />
<orderEntry type="library" name="Maven: com.github.smueller18:avro-builder:0.7" level="project" />
<orderEntry type="library" name="Maven: org.apache.avro:avro:1.8.1" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.13" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.13" level="project" />
Expand All @@ -31,19 +44,19 @@
<orderEntry type="library" name="Maven: com.101tec:zkclient:0.10" level="project" />
<orderEntry type="library" name="Maven: org.apache.zookeeper:zookeeper:3.4.9" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-connector-kafka-0.10_2.11:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-connector-kafka-0.9_2.11:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-connector-kafka-base_2.11:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:force-shading:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-java:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-core:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-annotations:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-metrics-core:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-connector-kafka-0.10_2.11:1.3.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-connector-kafka-0.9_2.11:1.3.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-connector-kafka-base_2.11:1.3.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:force-shading:1.3.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-java:1.3.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-core:1.3.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-annotations:1.3.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-metrics-core:1.3.2" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware.kryo:kryo:2.24.0" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware.minlog:minlog:1.2" level="project" />
<orderEntry type="library" name="Maven: org.objenesis:objenesis:2.1" level="project" />
<orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-shaded-hadoop2:1.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-shaded-hadoop2:1.3.2" level="project" />
<orderEntry type="library" name="Maven: commons-cli:commons-cli:1.3.1" level="project" />
<orderEntry type="library" name="Maven: xmlenc:xmlenc:0.52" level="project" />
<orderEntry type="library" name="Maven: commons-codec:commons-codec:1.4" level="project" />
Expand All @@ -67,15 +80,16 @@
<orderEntry type="library" name="Maven: org.apache.commons:commons-math3:3.5" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.7" level="project" />
<orderEntry type="library" name="Maven: com.google.code.findbugs:jsr305:1.3.9" level="project" />
<orderEntry type="library" name="Maven: io.confluent:kafka-avro-serializer:3.2.1" level="project" />
<orderEntry type="library" name="Maven: io.confluent:kafka-schema-registry-client:3.2.1" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.5.4" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.5.0" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.5.4" level="project" />
<orderEntry type="library" name="Maven: io.confluent:common-config:3.2.1" level="project" />
<orderEntry type="library" name="Maven: io.confluent:common-utils:3.2.1" level="project" />
<orderEntry type="library" name="Maven: org.junit.jupiter:junit-jupiter-api:5.0.0-M4" level="project" />
<orderEntry type="library" name="Maven: org.opentest4j:opentest4j:1.0.0-M2" level="project" />
<orderEntry type="library" name="Maven: org.junit.platform:junit-platform-commons:1.0.0-M4" level="project" />
<orderEntry type="library" name="Maven: io.confluent:kafka-avro-serializer:3.3.0" level="project" />
<orderEntry type="library" name="Maven: io.confluent:kafka-schema-registry-client:3.3.0" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.8.4" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.8.0" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.8.4" level="project" />
<orderEntry type="library" name="Maven: io.confluent:common-config:3.3.0" level="project" />
<orderEntry type="library" name="Maven: io.confluent:common-utils:3.3.0" level="project" />
<orderEntry type="library" name="Maven: org.junit.jupiter:junit-jupiter-api:5.0.0" level="project" />
<orderEntry type="library" name="Maven: org.apiguardian:apiguardian-api:1.0.0" level="project" />
<orderEntry type="library" name="Maven: org.opentest4j:opentest4j:1.0.0" level="project" />
<orderEntry type="library" name="Maven: org.junit.platform:junit-platform-commons:1.0.0" level="project" />
</component>
</module>
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.github.smueller18</groupId>
<artifactId>flink-serialization</artifactId>
<version>0.8</version>
<version>0.10.0</version>
<packaging>jar</packaging>

<name>Flink-Serialization</name>
Expand All @@ -15,10 +15,10 @@
<properties>
<avro.builder.version>0.7</avro.builder.version>
<kafka.version>0.10.2.1</kafka.version>
<avro.version>1.8.1</avro.version>
<flink.version>1.3.0</flink.version>
<avro.version>1.8.2</avro.version>
<flink.version>1.3.2</flink.version>
<scala.version>2.11</scala.version>
<confluent.version>3.2.1</confluent.version>
<confluent.version>3.3.0</confluent.version>
</properties>

<repositories>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,55 +1,80 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2017 Stephan Müller
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/


package com.github.smueller18.flink.serialization;

import java.util.Map;

import com.github.smueller18.avro.builder.AvroBuilder;
import kafka.utils.VerifiableProperties;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import io.confluent.kafka.serializers.KafkaAvroSerializer;

import java.util.Properties;

/**
* Copyright 2017 Stephan Müller
* License: MIT
*/
public class AvroBuilderKeyedSerializationSchema<T extends AvroBuilder> implements KeyedSerializationSchema<T> {

private Properties props;
private Map<String, ?> configs;

private transient KafkaAvroKeyEncoder keyEncoder;
private transient KafkaAvroValueEncoder valueEncoder;
private transient KafkaAvroSerializer keySerializer;
private transient KafkaAvroSerializer valueSerializer;

/***
*
* @param props properties for {@link KafkaAvroKeyEncoder} and {@link KafkaAvroValueEncoder}
* @param configs key-value-pairs for {@link KafkaAvroSerializer}
* schema.registry.url ({@link String}):
* Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas
* max.schemas.per.subject ({@link Integer}, default: 1000):
* Maximum number of schemas to create or cache locally
*
*/
public AvroBuilderKeyedSerializationSchema(Properties props) {
this.props = props;
public AvroBuilderKeyedSerializationSchema(Map<String, ?> configs) {
this.configs = configs;
}

@Override
public byte[] serializeKey(T avroBuilder) {
public byte[] serializeKey(T avroBuilderObject) {

if (this.keyEncoder == null)
this.keyEncoder = new KafkaAvroKeyEncoder(new VerifiableProperties(this.props));
if (this.valueSerializer == null) {
this.valueSerializer = new KafkaAvroSerializer();
this.valueSerializer.configure(this.configs, true);
}

return this.keyEncoder.toBytes(avroBuilder.getKeyRecord());
return this.keySerializer.serialize(AvroBuilder.getTopicName(avroBuilderObject.getClass()), avroBuilderObject.getKeyRecord());
}

@Override
public byte[] serializeValue(T avroBuilder) {

if (this.valueEncoder == null)
this.valueEncoder = new KafkaAvroValueEncoder(new VerifiableProperties(this.props));
public byte[] serializeValue(T avroBuilderObject) {

return this.valueEncoder.toBytes(avroBuilder.getValueRecord());
if (this.keySerializer == null) {
this.keySerializer = new KafkaAvroSerializer();
this.keySerializer.configure(this.configs, false);
}
return this.valueSerializer.serialize(AvroBuilder.getTopicName(avroBuilderObject.getClass()), avroBuilderObject.getValueRecord());
}

@Override
public String getTargetTopic(T avroBuilder) {
public String getTargetTopic(T avroBuilderObject) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2017 Stephan Müller
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.github.smueller18.flink.serialization;

import org.apache.avro.generic.GenericRecord;

/**
* Copyright 2017 Stephan Müller
* License: MIT
*/

public class GenericKeyValueRecord {

Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 0f12060

Please sign in to comment.