This sample demonstrates a simple producer and consumer; the producer sends objects of type User1
and the consumer receives objects of type User2
(the objects have the same field, foo
The producer uses a JsonSerializer
; the consumer uses the StringDeserializer
that is automatically configured by Spring Boot, together with a StringJsonMessageConverter
which converts to
the type of the listener method argument.
Run the application and use curl to send some data:
$ curl -X POST http://localhost:8080/send/user/<user-name>;
http POST :8080/send/user/yatin
2018-11-05 10:03:40.216 INFO 39766 --- [ fooGroup-0-C-1] Application : Received: User2 [name=bar]
The consumer is configured with a SeekToCurrentErrorHandler
which replays failed messages up to 3 times and, after retries are exhausted, sends a bad message to a dead-letter topic.
A second @KafkaListener
consumes the raw JSON from the message.
2018-11-05 10:12:32.552 INFO 41635 --- [ fooGroup-0-C-1] Application : Received: User2 [foo=fail]
2018-11-05 10:12:32.561 ERROR 41635 --- [ fooGroup-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
2018-11-05 10:12:33.033 INFO 41635 --- [ fooGroup-0-C-1] Application : Received: User2 [foo=fail]
2018-11-05 10:12:33.033 ERROR 41635 --- [ fooGroup-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
2018-11-05 10:12:33.537 INFO 41635 --- [ fooGroup-0-C-1] Application : Received: User2 [foo=fail]
2018-11-05 10:12:43.359 INFO 41635 --- [ dltGroup-0-C-1] Application : Received from DLT: {"foo":"fail"}
Start Confluent Kafka platform
cd Workspace/Kafka/confluent-5.2.2
bin/confluent start
For Confluent Kafka version 5.3.1, download cli and Confluent Platform. Follow instructions from here.
confluent local start --path <path-to-confluent>
To Stop
bin/confluent stop
To Stop (Version 5.3.1)
confluent local stop
Observe new Kafka topics
kafka-topics --zookeeper --list
Observe Users topic
kafka-console-consumer --bootstrap-server describe --topic users
Observe Kafka Streams Counts output topic
kafka-console-consumer --bootstrap-server describe --topic users-count