Skip to content

Commit

Permalink
add a simple flink and an aggregated join
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Oct 23, 2020
1 parent 37ec064 commit 6899eba
Show file tree
Hide file tree
Showing 10 changed files with 567 additions and 85 deletions.
88 changes: 88 additions & 0 deletions flink-join/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Flink Foreign Key Joins

This example demonstrates how two Debezium change data topics can be joined via Flink.

The source database contains two tables, `customers` and `addresses`, with a foreign key relationship from the latter to the former,
i.e. a customer can have multiple addresses.

Using Flink the change event for the parent customers are represented as a dynamic table defined with CREATE TABLE, while the child addresses are represented as a stream of pojos that are first aggregated by the foreign key.
Each insertion, update or deletion of a record on either side will re-trigger the join.

## Building

Prepare the Java components by first performing a Maven build.

```console
$ mvn clean install
```

## Environment

Setup the necessary environment variables

```console
$ export DEBEZIUM_VERSION=1.2

```

The `DEBEZIUM_VERSION` specifies which version of Debezium artifacts should be used.

## Start the demo

Start all Debezium components:

```console
$ docker-compose up connect
```

This creates the kafka connect service and all dependent services defined in the `docker-compose.yaml` file.

## Configure the Debezium connector

Register the connector to stream outbox changes from the order service:

```console
$ http PUT http://localhost:8083/connectors/inventory-connector/config < register-postgres.json
HTTP/1.1 201 Created
```
## Run the Flink Job

To run the Flink job in local mode, simply compile and start the job class:

```console
$ mvn clean install
$ mvn exec:java \
-Dexec.mainClass="io.debezium.examples.flink.join.FlinkJoinTableStream" \
-Dexec.classpathScope=compile
```

To run the Flink job against a remote cluster is a little more involved. The simplest approach is to create a docker compose session cluster (docker-compose up jobmanager) then copy the Flink Kafka dependency to the lib and follow the instructions for submitting this project jar as a job - see the [Flink docs](https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#session-cluster-with-docker-compose).

## Review the outcome

Examine the joined events using _kafkacat_:

```console
$ docker run --tty --rm \
--network kstreams-fk-join-network \
debezium/tooling:1.1 \
kafkacat -b kafka:9092 -C -o beginning -q \
-t customers-with-addresses | jq .
```

## Useful Commands

Getting a session in the Postgres DB of the "order" service:

```console
$ docker run --tty --rm -i \
--network kstreams-fk-join-network \
debezium/tooling:1.1 \
bash -c 'pgcli postgresql://postgres:postgres@postgres:5432/postgres'
```

E.g. to update a customer record:

```sql
update inventory.customers set first_name = 'Sarah' where id = 1001;
```
76 changes: 76 additions & 0 deletions flink-join/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
version: '3.5'

services:

zookeeper:
image: debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
networks:
- my-network
kafka:
image: debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS=100
networks:
- my-network

postgres:
image: debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
volumes:
- ./inventory-addresses.sql:/docker-entrypoint-initdb.d/zzz.sql
networks:
- my-network

connect:
image: debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
networks:
- my-network

jobmanager:
image: flink:1.11.2-scala_2.11
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.11.2-scala_2.11
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
networks:
my-network:
name: flink-join-network
18 changes: 18 additions & 0 deletions flink-join/inventory-addresses.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE TABLE inventory.addresses (
id SERIAL NOT NULL PRIMARY KEY,
customer_id INTEGER NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
zipcode VARCHAR(255) NOT NULL,
country VARCHAR(255) NOT NULL,
FOREIGN KEY (customer_id) REFERENCES inventory.customers(id)
);
ALTER SEQUENCE inventory.addresses_id_seq RESTART WITH 100001;
ALTER TABLE inventory.addresses REPLICA IDENTITY FULL;

INSERT INTO inventory.addresses
VALUES (default, 1001, '42 Main Street', 'Hamburg', '90210', 'Canada'),
(default, 1001, '11 Post Dr.', 'Berlin', '90211', 'Canada'),
(default, 1002, '12 Rodeo Dr.', 'Los Angeles', '90212', 'US'),
(default, 1002, '1 Debezium Plaza', 'Monterey', '90213', 'US'),
(default, 1002, '2 Debezium Plaza', 'Monterey', '90213', 'US');
71 changes: 71 additions & 0 deletions flink-join/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>

<groupId>io.debezium.examples.flink.join</groupId>
<artifactId>flink-join</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<surefire-plugin.version>2.22.0</surefire-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.11.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.11.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.11</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.11.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.11.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<systemProperties>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
</systemProperties>
</configuration>
</plugin>
</plugins>
</build>
</project>
16 changes: 16 additions & 0 deletions flink-join/register-postgres.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.whitelist": "inventory",
"decimal.handling.mode" : "string",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.debezium.examples.flink.join;

import java.util.Properties;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
* Performs an inner join of a customer and an address
*/
public class FlinkJoin {

public static String TOPIC_OUT = "customer-with-address";

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.executeSql("CREATE TABLE customers (\n" +
" id INT PRIMARY KEY,\n" +
" first_name STRING,\n" +
" last_name STRING,\n" +
" email STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'dbserver1.inventory.customers',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = '1',\n" +
" 'format' = 'debezium-json',\n" +
" 'scan.startup.mode' = 'earliest-offset'\n" +
")");

tableEnv.executeSql("CREATE TABLE addresses (\n" +
" id BIGINT PRIMARY KEY,\n" +
" customer_id INT,\n" +
" street STRING,\n" +
" city STRING,\n" +
" zipcode STRING,\n" +
" country STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'dbserver1.inventory.addresses',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = '1',\n" +
" 'format' = 'debezium-json',\n" +
" 'scan.startup.mode' = 'earliest-offset'\n" +
")");

Table addressWithEmail = tableEnv.sqlQuery("select c.id, c.email, a.country, a.id as address_id "
+ "from customers as c inner join addresses as a on c.id = a.customer_id");

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

//-- coming soon in flink, we should be able to output a changelog/cdc stream
//DebeziumJsonSerializationSchema schema ...

DataStream<Tuple2<Boolean, Row>> output = tableEnv.toRetractStream(addressWithEmail, Row.class);

TypeInformation<Row> rowType = ((TupleTypeInfo)output.getType()).getTypeAt(1);
JsonRowSerializationSchema rowSerialization = JsonRowSerializationSchema.builder().withTypeInfo(rowType).build();
//output the key as the first field
JsonRowSerializationSchema keySerialization = JsonRowSerializationSchema.builder()
.withTypeInfo(new RowTypeInfo(Types.INT)).build();

FlinkKafkaProducer<Tuple2<Boolean, Row>> kafkaProducer = new FlinkKafkaProducer<Tuple2<Boolean, Row>>(TOPIC_OUT,
((record, timestamp) -> new ProducerRecord<byte[], byte[]>(TOPIC_OUT,
keySerialization.serialize(record.f1),
record.f0 ? rowSerialization.serialize(record.f1) : null)),
properties,
Semantic.EXACTLY_ONCE);

output.addSink(kafkaProducer);

env.execute("Debezium Join");
}

}
Loading

0 comments on commit 6899eba

Please sign in to comment.