x Java Java 8 JUnit JSON
  • XML
  • JDBC Spring Boot Microservices React Contact Us

    Spring Boot Kafka Streams example

    Stream processing means processing the data as it continuosly arrives as a stream. Kafka Streams is a library to process stream of records.Kafka Streams DSL is high-level API to build Kafka Streams. It uses KStream object which represents the streaming key/value pair. Kafka Streams are typically used to process, transform the records and then send to another topic.

    Step 1) Launch zookeeper and kafka server

    View this page for details on launching zookeeper & kafka server Spring Boot Kafka example

    Step 2) Create topic where stream data will be published

    Creata topic named "data" where a stream of records will be send and which will be consumed by the Kafka stream processor

    kafka-topics.bat --create --topic data --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092

    Step 3) Publish data to "data" topic using kafka console

    We will send data as key value pair using below command. Press Ctrl+C to stop sending the data.

    kafka-console-producer.bat --topic data --broker-list localhost:9092 --property "parse.key=true" --property "key.separator=":"
    C:\Dev\kafka_2.11-2.3.1\bin\windows>kafka-console-producer.bat --topic data --broker-list localhost:9092 --property "parse.key=true"  --property "key.separator=":"
    >1:asia
    >2:europe
    >3:australia

    Step 4) Create pom.xml for Spring Boot Kafka Streams Processor application

    Create a pom.xml file as below and keep it in Java project directory. It contains Spring Boot Kafka maven dependencies.

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" 
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            
        <modelVersion>4.0.0</modelVersion>
            
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.6.RELEASE</version>
            <relativePath/>
        </parent>
            
        <groupId>com.example</groupId>
        <artifactId>demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>demo</name>
        <description>Demo project for Spring Boot Kafka Streams</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
            </dependency> 
                       
        </dependencies>
        <!-- To create an executable jar -->
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
            
    </project>        

    Step 5) Create KafkaStreamsConfig class which contains kafka streams configuration

    To configure Kafka Streams, you need to provide at least BOOTSTRAP_SERVERS_CONFIG & APPLICATION_ID_CONFIG properties. BOOTSTRAP_SERVERS_CONFIG points the Kafka Streams application to the Kafka cluster while APPLICATION_ID_CONFIG identifies the Kafka Streams application, which should be unique per cluster.

    In this example we are using high level Streams DSL API. First we create a StreamsBuilder and use it to build a Topology which is a DAG - a directed graph of transformations applied to stream of events. In this example we are dpoing only processing that is printing to console and not really doing any transformations.

        
    package com.example;
    
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.KStream;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class KafkaStreamsConfig {
    
        @Bean
        public KafkaStreams kafkaStreams() {
            final Topology topology = getTopology();
            final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration());
            streams.cleanUp();
            streams.start();
    
            // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    
            return streams;
        }
    
        public Properties streamsConfiguration() {
            Properties streamsConfiguration = new Properties();
            streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
            streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
            return streamsConfiguration;
        }
    
        private Topology getTopology() {
            final StreamsBuilder builder = new StreamsBuilder();
            final KStream<String, String> input = builder.stream("data");
    
            input.foreach((k, v) -> System.out.println("Key = " + k + " value = " + v));
    
            return builder.build();
        }
    } 

    Step 6) Create KafkaStreamsProcessorApplication class which launches the Spring Boot application

        
    package com.example;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class KafkaStreamsProcessorApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaStreamsProcessorApplication.class, args);
        }
    
    }

    Step 7) Running KafkaStreamsProcessorApplication

    Start KafkaStreamsProcessorApplication, you will see the list of records printed on console as key value pair.

    Console Output :

    2021-03-07 20:29:24.736  INFO 16936 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=my-app-52667270-7861-4e1d-bdfc-3e590673faf1-StreamThread-1-consumer, groupId=my-app] Setting offset for partition data-0 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[LAPTOP-L5SL049K:9092 (id: 0 rack: null)], epoch=absent}}
    Key = 1 value = asia
    Key = 2 value = europe
    Key = 3 value = australia
    2021-03-07 20:29:25.406  INFO 16936 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
    2021-03-07 20:29:25.437  INFO 16936 --- [           main] o.s.c.f.c.c.SimpleFunctionRegistry       : Looking up function '' with acceptedOutputTypes: []    

    Configuring Streams using application.yml

    In case you do not want to keep the Streams configuration in Java class, you can also use application.yml to specify the Streams configurations shown below. You also need to provide a bean to process the streams as shown below in KafkaStreamsProcessorApplication.

    spring:
      cloud:
        stream:
          bindings:
            process-in-0.destination: data
          kafka:
            streams:
              binder:
                applicationId: my-app
                brokers: localhost:9092
                configuration:
                  default:
                    key:
                      serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                    value:
                      serde: org.apache.kafka.common.serialization.Serdes$StringSerde   
    package com.example;
    
    import org.apache.kafka.streams.kstream.KStream;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    
    @SpringBootApplication
    public class KafkaStreamsProcessorApplication {
    	
        public static void main(String[] args) {
            SpringApplication.run(KafkaStreamsProcessorApplication.class, args);
        }
    
        @Bean
        public java.util.function.Consumer<KStream<String, String>> process() {
            return stream -> stream.foreach((key, value) -> {
                System.out.println(key + ":" + value);
            });
        }
    		
    }    

    Creating Topology

    In below example we create a Topology where we first create firstStream (KStream) form source stream, then we create another KStream upperCaseStream which converts the data to uppercase and finally it sends the data to sink which is another topic.

    private Topology getTopology() {
        final StreamsBuilder builder = new StreamsBuilder();
        final KStream<String, String> firstStream = builder.stream("data");
    
        firstStream.foreach((k, v) -> System.out.println("Key = " + k + " value = " + v));
    
        final KStream<String, String> upperCaseStream = firstStream.mapValues(data
            -> data.toUpperCase());
        upperCaseStream.foreach((k, v) -> System.out.println("Key = " + k + " value = " + v));
    		
        upperCaseStream.to("data-out", Produced.with(Serdes.String(), Serdes.String()));
        return builder.build();
    }   
    Comments

    Leave a Reply

    Your email address will not be published. Required fields are marked *











    Share This