Example app to demonstrate Kafka Streams with Kotlin.
Kafka Streams Topology is configured in AdjustmentsStreams.kt, as follows:
- Source is
adjust-balancetopic. - Records flow downstream through a stateful transformer:
- If account retains positive balance, balance adjustment is accepted and balance is persisted in a
balance-storestate store. - If account would result in an overdraft, balance adjustment is rejected.
- If account retains positive balance, balance adjustment is accepted and balance is persisted in a
- Stream is materialized to:
balance-adjustmenttopic if successfulbalance-adjustment-rejectedtopic if rejected
Adjustments Streams App will send a new record to
adjust-balancetopic every second.
Topology is tested in AdjustmentsStreamTest.kt. No real Kafka broker is used, so the tests execute very quickly with very little overhead.
Only spin up Kafka components and run Adjustments Streams App outside docker-compose, i.e. you have to start it manually.
Script will also create topics defined in topics.txt
$ ./start_kafka.shSpin up everything inside docker-compose:
- Kafka configured in docker-compose.yml
- Adjustments Streams App configured in adjustments-streams.yml
$ ./start_all.sh$ ./stop_all.shFollow balance-adjustment / balance-adjustment-rejected topic.
$ ./follow_output_topic.sh
Follow topic balance-adjustment
{"accountId":"3","balance":2950,"adjustedAmount":30}
{"accountId":"2","balance":2530,"adjustedAmount":30}
$ ./follow_output_topic.sh rejected
Follow topic balance-adjustment-rejected
{"accountId":"8","adjustedAmount":-40,"reason":"Insufficient funds"}
{"accountId":"8","adjustedAmount":-50,"reason":"Insufficient funds"}
