Spring Boot Kafka Streams example
Stream processing means processing the data as it continuosly arrives as a stream. Kafka Streams
is a library to process stream of records.Kafka Streams DSL
is high-level API to build Kafka Streams
. It uses KStream
object which represents the streaming key/value pair. Kafka Streams
are typically used to process, transform the records and then send to another topic.
Step 1) Launch zookeeper and kafka server
View this page for details on launching zookeeper & kafka server Spring Boot Kafka exampleStep 2) Create topic where stream data will be published
Creata topic named "data" where a stream of records will be send and which will be consumed by the Kafka stream processor
kafka-topics.bat --create --topic data --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
Step 3) Publish data to "data" topic using kafka console
We will send data as key value pair using below command. Press Ctrl+C to stop sending the data.
kafka-console-producer.bat --topic data --broker-list localhost:9092 --property "parse.key=true" --property "key.separator=":"
C:\Dev\kafka_2.11-2.3.1\bin\windows>kafka-console-producer.bat --topic data --broker-list localhost:9092 --property "parse.key=true" --property "key.separator=":" >1:asia >2:europe >3:australia
Step 4) Create pom.xml for Spring Boot Kafka Streams Processor application
Create a pom.xml file as below and keep it in Java project directory. It contains Spring Boot Kafka maven dependencies.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath/> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot Kafka Streams</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> </dependencies> <!-- To create an executable jar --> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Step 5) Create KafkaStreamsConfig class which contains kafka streams configuration
To configure Kafka Streams, you need to provide at least BOOTSTRAP_SERVERS_CONFIG
& APPLICATION_ID_CONFIG
properties. BOOTSTRAP_SERVERS_CONFIG
points the Kafka Streams application to the Kafka cluster while APPLICATION_ID_CONFIG
identifies the Kafka Streams application, which should be unique per cluster.
In this example we are using high level Streams DSL API. First we create a StreamsBuilder
and use it to build a Topology
which is a DAG - a directed graph of transformations applied to stream of events. In this example we are dpoing only processing that is printing to console and not really doing any transformations.
package com.example; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaStreamsConfig { @Bean public KafkaStreams kafkaStreams() { final Topology topology = getTopology(); final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration()); streams.cleanUp(); streams.start(); // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); return streams; } public Properties streamsConfiguration() { Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); return streamsConfiguration; } private Topology getTopology() { final StreamsBuilder builder = new StreamsBuilder(); final KStream<String, String> input = builder.stream("data"); input.foreach((k, v) -> System.out.println("Key = " + k + " value = " + v)); return builder.build(); } }
Step 6) Create KafkaStreamsProcessorApplication class which launches the Spring Boot application
package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaStreamsProcessorApplication { public static void main(String[] args) { SpringApplication.run(KafkaStreamsProcessorApplication.class, args); } }
Step 7) Running KafkaStreamsProcessorApplication
Start KafkaStreamsProcessorApplication, you will see the list of records printed on console as key value pair.
Console Output :
2021-03-07 20:29:24.736 INFO 16936 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=my-app-52667270-7861-4e1d-bdfc-3e590673faf1-StreamThread-1-consumer, groupId=my-app] Setting offset for partition data-0 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[LAPTOP-L5SL049K:9092 (id: 0 rack: null)], epoch=absent}} Key = 1 value = asia Key = 2 value = europe Key = 3 value = australia 2021-03-07 20:29:25.406 INFO 16936 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler' 2021-03-07 20:29:25.437 INFO 16936 --- [ main] o.s.c.f.c.c.SimpleFunctionRegistry : Looking up function '' with acceptedOutputTypes: []
Configuring Streams using application.yml
In case you do not want to keep the Streams configuration in Java class, you can also use application.yml
to specify the Streams configurations shown below. You also need to provide a bean to process the streams as shown below in KafkaStreamsProcessorApplication
.
spring: cloud: stream: bindings: process-in-0.destination: data kafka: streams: binder: applicationId: my-app brokers: localhost:9092 configuration: default: key: serde: org.apache.kafka.common.serialization.Serdes$StringSerde value: serde: org.apache.kafka.common.serialization.Serdes$StringSerde
package com.example; import org.apache.kafka.streams.kstream.KStream; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class KafkaStreamsProcessorApplication { public static void main(String[] args) { SpringApplication.run(KafkaStreamsProcessorApplication.class, args); } @Bean public java.util.function.Consumer<KStream<String, String>> process() { return stream -> stream.foreach((key, value) -> { System.out.println(key + ":" + value); }); } }
Creating Topology
In below example we create a Topology
where we first create firstStream (KStream)
form source stream, then we create another KStream upperCaseStream
which converts the data to uppercase and finally it sends the data to sink which is another topic.
private Topology getTopology() { final StreamsBuilder builder = new StreamsBuilder(); final KStream<String, String> firstStream = builder.stream("data"); firstStream.foreach((k, v) -> System.out.println("Key = " + k + " value = " + v)); final KStream<String, String> upperCaseStream = firstStream.mapValues(data -> data.toUpperCase()); upperCaseStream.foreach((k, v) -> System.out.println("Key = " + k + " value = " + v)); upperCaseStream.to("data-out", Produced.with(Serdes.String(), Serdes.String())); return builder.build(); }