x Java Java 8 JUnit JSON
  • XML
  • JDBC Spring Boot Microservices React Contact Us

    Spring Boot Kafka example

    Kafka is a distributed publish/subscribe messaging system which is highly scalable, durable and fast. It supports multiple consumers, multiple subscribers, durable messages by persisting them to disk, highly scalable and provides execellent performance.

    This example shows how to build a Spring Boot Kafka application. We will create a Spring Boot application which will send message to Kafka topic and will also consume the message published to Kafka topic.

    Step 1) Install and configure zookeeper

    Download zookeeper from here http://zookeeper.apache.org/releases.html

    After extracting zookeeper, rename file "zoo_sample.cfg" to "zoo.cfg" (under <zookeeper_dir>/conf). Then edit and change dataDir=/tmp/zookeeper to <zookeeper_dir>\data .
    Set JAVA_HOME=C:\Program Files\Java\jre1.8.0_261 & launch zookeeper using :: zkserver (under <zookeeper_dir>/bin)

    Step 2) Install and configure Kafka server

    Download Kafka from here http://kafka.apache.org/downloads.html

    After extracting Kafka, edit <kafka_dir>\server.properties and change log.dirs=/tmp/kafka-logs to <kafka_dir>\kafka-logs .
    Launch Kafka server using :: kafka-server-start.bat ../../config/server.properties

    Step 3) Create pom.xml for Spring Boot Kafka Producer and Consumer application

    Create a pom.xml file and add below maven dependencies.

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>	
    
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency> 

    Step 4) Create application.yml which contains kafka configuration


    server: 
        port: 9000
    spring:
       kafka:
         consumer:
            bootstrap-servers: localhost:9092
            group-id: group_id
            auto-offset-reset: earliest
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
         producer:
            bootstrap-servers: localhost:9092
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer

    Alternatively you can also create configuration using Java class as shown below

        
    package com.example;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    @Configuration
    public class KafkaProducerConfig {
    
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            Map<String, Object> producerConfiguration = new HashMap<>();
            producerConfiguration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            producerConfiguration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            producerConfiguration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(producerConfiguration);
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    } 

        
    package com.example;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    
    @Configuration
    public class KafkaConsumerConfig {
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> consumerConfiguration = new HashMap<>();
            consumerConfiguration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            consumerConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
            consumerConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            consumerConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(consumerConfiguration);
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
    }

    Step 5) Create KafKaProducerService to publish message to a topic

    Spring provides KafkaTemplate to publish a message to a Kafka topic. The topic will be created when KafkaTemplate send("test", message) is called where "test" it the topic name. You can also create topic explicitly using following:
    kafka-topics.bat --create --topic test --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092

    package com.example;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class KafKaProducerService {
        private static final Logger logger = LoggerFactory.getLogger(KafKaProducerService.class);
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendMessage(String message) {
            this.kafkaTemplate.send("test", message);
            logger.info(String.format("Message sent -> %s", message));
        }
    }  

    Step 6) Create KafkaProducerController class that will expose an endpoint to send message from browser

    package com.example;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequestMapping(value = "/kafka")
    public class KafkaProducerController {
        private final KafKaProducerService producerService;
    
        @Autowired
        public KafkaProducerController(KafKaProducerService producerService) {
            this.producerService = producerService;
        }
    
        @GetMapping(value = "/publish")
        public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
            this.producerService.sendMessage(message);
        }
    }    

    Step 7) Create KafKaConsumerService that will consumer the message

    package com.example;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;
    
    @Service
    public class KafKaConsumerService {
        private final Logger logger = LoggerFactory.getLogger(KafKaConsumerService.class);
    
        @KafkaListener(topics = "test", groupId = "group_id")
        public void consume(String message) {
            logger.info(String.format("Message recieved -> %s", message));
        }
    
    }      

    Step 8) Create KafkaApplication class to run the Spring Boot application

    package com.example;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class KafkaApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaApplication.class, args);
        }
    }        

    Step 9) Testing KafkaApplication

    Start KafkaApplication, open any browser and launch http://localhost:9000/kafka/publish?message=hello

    Above url will invoke REST endpoint /kafka/publish exposed by KafkaProducerController. It will call KafKaProducerService sendMessage() which will publish the message to Kakfa topic "test". Finally message will be consumed by KafKaConsumerService consume method. You will see message is published and received by above consumer as shown in below logs.

    2021-02-05 13:14:29.794  INFO 21032 --- [nio-9000-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
    2021-02-05 13:14:29.794  INFO 21032 --- [nio-9000-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
    2021-02-05 13:14:29.812  INFO 21032 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: WzUf88tISwO55b5mbmMj1Q
    2021-02-05 13:14:29.840  INFO 21032 --- [nio-9000-exec-1] com.example.KafKaProducerService         : Message sent -> hello
    2021-02-05 13:14:29.915  INFO 21032 --- [ntainer0-0-C-1] com.example.KafKaConsumerService         : Message recieved -> hello
        

    Comments

    Leave a Reply

    Your email address will not be published. Required fields are marked *











    Share This