The purpose of this demo is to show a complete machine learning pipeline including loading and streaming data from the database. The demo classifies handwritten digits from MNIST data sample using simple neural-network model build and run with TensorFlow. The training data sample as well as test sample is stored in Postgres database. It accompanies this post on the Debezium blog.
We need first to download the MNIST data sample and convert it into some convenient form which would be suitable for loading into the database. Running following Python script would download MNIST samples and create SQL files from them:
$ ./mnist2sql.py --download
If you already downloaded the samples and want to re-generate SQL files, run only
$ ./mnist2sql.py
The data samples as well as SQL files are located in postgres
directory.
This demo uses custom single message transform (SMT). You need to build it first:
$ mvn clean package -f connect/mnist-smt/pom.xml
You also have to setup couple environment variables:
$ export DEBEZIUM_VERSION=2.2
$ export TENSORFLOW_VERSION=2.12.0
which determines the used version of Debezium and TensorFlow.
Now you should be able to run the demo:
$ docker-compose up --build
It will build custom images for Kafka Connect, Postgres and Tensorflow. Custom Kafka Connect image contains SMT built in preparation step above, Postgres loads training data from training SQL file and TensorFlow image contains additional TensorFlow dependencies. Besides these containers Kafka and ZooKeeper containers are started.
Register Postgres Debezium connector to start streaming the training data from Postgres database into Kafka:
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
You can check that the training sample is already in Kafka topic:
$ docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --property print.key=true --topic tf.public.mnist_train
TensorFlow image includes support for Jupyter notebooks. To find authorization token, you need to check TensorFlow logs:
$ docker-compose logs tensorflow
and navigate to http://127.0.0.1:8888 with proper authorization token, e.g. to http://127.0.0.1:8888/?token=990927a7a62345ff5f8e244083e94d850b650232d80b44d2.
The demo contains a prepared Jupyter notebook mnist_kafka.ipynb
.
Upload and open the notebook and run the first three cells in the notebook.
The cells contain initial setup and load training data from Kafka.
The third cell defines a very simple neural network model and trains it based on the data loaded in the previous cell.
You can evaluate the model using a test data sample.
Data preparation script has already downloaded this sample and prepared a SQL file from it.
However, this data is not loaded into Postgres yet.
To show the capability of streaming the data from the database to Tensorflow in real time as they are stored in the databse we will start the model evaluation first and then insert test data into the database.
Run the fourth cell of the notebook.
The client is configured to time out in 9 seconds, so you have to load some data into Postgres mnist_test
table withing this timeframe.
To populate test table run
$ export PGPASSWORD=postgres
$ psql -h localhost -U postgres -f postgres/mnist_test.sql
After a while you should be able to see model evaluation, i.e. it’s accuracy and loss.
Remaining cells contains a function for plotting a single digit and predicting what is the number is in this image. For convenience there are hard coded pixels of two images which you can use with this function and evaluate yourselves if the digit on the picture corresponds to what the model predicts. Also you can re-read the images directly from Kafka stream or create a new stream and try the model predictions as the data are streamed to the model from the database. This is shown in the last cell where the first four records from test stream are read and their images are shown together with the model predictions.