The repository is organized as follows:
- An explanation of the concept of Keyed Watermarks.
- Instructions for setting up and running a cluster to use Apache Flink with Keyed Watermarks.
- The repository includes:
- JAR files and source Java classes for Keyed Watermarks.
- A testing pipeline.
- A Dockerfile for building the cluster.
- Detailed usage instructions.
Big Data Stream processing engines, exemplified by tools like Apache Flink, employ windowing techniques to manage unbounded streams of events. The aggregation of relevant data within Windows holds utmost importance for event-time windowing due to its impact on result accuracy. A pivotal role in this process is attributed to watermarks, unique timestamps signifying event progression in time. Nonetheless, the existing watermark generation method within Apache Flink, operating at the input stream level, exhibits a bias towards faster sub-streams, causing the omission of events from slower counterparts. Through our analysis, we determined that Apache Flink's standard watermark generation approach results in an approximate
Set up an Apache Flink Cluster using Docker
- Clone this repo. using the following command:
git clone https://github.com/TawfikYasser/Keyed-Watermarks-in-Apache-Flink.git
. - Download the
flink
source code with keyed watermarks from the following link. - Now, you can build flink by
cd
to the source code you've downloaded in the previous step using the following instructions. - After the successful build of flink, put the
build-target
folder inside the repo. folder. - Now, the repo. contains the following files:
- /build-target
- /configurations
- /keyed-watermarks-code-base
- /pipeline
- Dockerfile
- ExperimentalSetup.png
- LICENSE
- README.md
- code-listings.txt
- scenario.png
- Run the following command in the repo.:
docker build -t <put-your-docker-image-name-here> .
, a docker image will be created. We will use it next to create the containers. - Then we need to create 3 docker containers, one for the
JobManager
, and two for theTaskManagers
.- To create the
JobManager
container run the following command:docker run -it --name JM -p 8081:8081 --network bridge <put-your-docker-image-name-here>:latest
.- We're exposing the 8081 port in order to be able to access the Apache Flink Web UI from outside the containers.
- Also we're attaching the container to the
bridge
network.
- To create the
TaskManagers
run the following two commands:docker run -it --name TM1 --network bridge <put-your-docker-image-name-here>:latest
.docker run -it --name TM2 --network bridge <put-your-docker-image-name-here>:latest
.
- To create the
- Now, you have to configure the
masters
,workers
, andflink-config.yml
files on each container as follows: - Start the containers, and start the
ssh
service on the containers of taskManagers using the following command:service ssh start
.
- Now, you're ready to start the cluster, open the
JobManager
container using:docker exec -it JM bash
. - Go to
/home/flink/bin/
and run the cluster using:./start-cluster.sh
. - To copy any file (i.e. the job jar file) into the container, use the following command outside the container:
docker cp <file-path> JM:<path-inside-the-container>
. - Run a Flink job inside
/home/flink/bin/
using the following command:./flink run <jar-path-inside-the-container>
.- (-p optional if you want to override the default parallelism in
flink-config.yml
)
- (-p optional if you want to override the default parallelism in
- Open the Web UI of Apache Flink using:
http://localhost:8081
.- (If you're running the containers on a VM use the VM's External IP, otherwise use your local machine's IP)
- All datasets used are in this link.
- The main dataset used contains around 6M records.
- The secondary dataset used contains around 49M records.
- Code could be found here.
- The pipeline code contains the flink java code to run the experiments of Accuracy, Latency, and State Size for all datasets.
- All you need is to build the pipeline project and copy the
.jar
file into the JM container to use to run a flink job. - In the pipelines project you need to import the following jar files:
- flink-connector-files-1.17-SNAPSHOT.jar
- flink-dist_2.12-1.17-SNAPSHOT.jar
- flink-shaded-zookeeper-3-3.5.9-15.0.jar
- log4j-api-2.20.0
- log4j-core-2.20.0
- Accuracy Experiments:
- You've built the flink source code and got the
build-target
. - Update the input and output paths in the pipeline code before building the pipeline jar.
- Car IDs text files are here.
- Next, build the pipeline code of the accuracy experiments, you can find it here.
- Move the pipeline jar into the container, then run the flink job using:
./bin/flink run <jar-path-inside-the-container>
. - After running the job, 8 sub-jobs (for each dataset) will generate output files.
- Finally, you can run the Python pipelines to generate the final results & graphs of accuracy.
- You've built the flink source code and got the
- Latency Experiments (Keyed):
- Same steps as in Accuracy with some changes.
- For Keyed we need to uncomment some lines in the class
KeyedTimestampsAndWatermarksOperator
to allow logging the start timestamp of watermark generation. - Also, we need to allow logging the end timestamp of watermark generation in the class
WindowOperator
. - Finally, re build the project, replace the new build-target with the existing one inside the containers and run the latency job pipeline.
WindowOperator
class for Latency experiments could be found here.
- Latency Experiments (Non-keyed):
- Same steps as in Accuracy with some changes.
- For Keyed we need to uncomment some lines in the class
TimestampsAndWatermarksOperator
to allow logging the start timestamp of watermark generation. - Also, we need to allow logging the end timestamp of watermark generation in the class
WindowOperator
. - Finally, re build the project, replace the new build-target with the existing one inside the containers and run the latency job pipeline.
WindowOperator
class for Latency experiments could be found here.
- State Size Experiments (Keyed):
- Same steps as in Accuracy with some changes.
- We need to allow logging the state size in the class
WindowOperator
, replace theWindowOperator
class with the attached one here. (Use the defaultTimestampsAndWatermarksOperator
andKeyedTimestampsAndWatermarksOperator
classes) - Finally, re build the project, replace the new build-target with the existing one inside the containers and run the latency job pipeline.
@INPROCEEDINGS{10296717,
author={Yasser, Tawfik and Arafa, Tamer and El-Helw, Mohamed and Awad, Ahmed},
booktitle={2023 5th Novel Intelligent and Leading Emerging Sciences Conference (NILES)},
title={Keyed Watermarks: A Fine-grained Tracking of Event-time in Apache Flink},
year={2023},
volume={},
number={},
pages={23-28},
doi={10.1109/NILES59815.2023.10296717}}
IMPORTANT: Code Base of Keyed Watermarks
& Ready to Run Flink Cluster