Spring Boot Kafka example
Kafka
is a distributed publish/subscribe messaging system which is highly scalable, durable and fast. It supports multiple consumers, multiple subscribers, durable messages by persisting them to disk, highly scalable and provides execellent performance.
This example shows how to build a Spring Boot Kafka application. We will create a Spring Boot application which will send message to Kafka
topic and will also consume the message published to Kafka
topic.
Step 1) Install and configure zookeeper
Downloadzookeeper
from here http://zookeeper.apache.org/releases.html
After extracting zookeeper
, rename file "zoo_sample.cfg" to "zoo.cfg" (under <zookeeper_dir>/conf). Then edit and change dataDir=/tmp/zookeeper to <zookeeper_dir>\data .
Set JAVA_HOME=C:\Program Files\Java\jre1.8.0_261 & launch zookeeper
using :: zkserver (under <zookeeper_dir>/bin)
Step 2) Install and configure Kafka server
DownloadKafka
from here http://kafka.apache.org/downloads.html
After extracting Kafka
, edit <kafka_dir>\server.properties and change log.dirs=/tmp/kafka-logs to <kafka_dir>\kafka-logs .
Launch Kafka
server using :: kafka-server-start.bat ../../config/server.properties
Step 3) Create pom.xml for Spring Boot Kafka Producer and Consumer application
Create a pom.xml file and add below maven dependencies.
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Step 4) Create application.yml which contains kafka configuration
bootstrap-servers
: a list of host:port that producer/consumer will use to establish connection to kafa clustergroup-id
: consumer group to which the consumer belongs toauto-offset-reset
: it will let consumer read a record from a partition from a particular point, default value islatest
which means consumer will read the latest record after it starts up.key-deserializer
: name of the Class that will be use to deserialize the message key received by kafkavalue-deserializer
: name of the Class that will be use to serialize the message value received by kafkakey-serializer
: name of the Class that will be use to serialize the message key send to kafkavalue-serializer
: name of the Class that will be use to serialize the message value send to kafka
server: port: 9000 spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
Alternatively you can also create configuration using Java class as shown below
package com.example; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> producerConfiguration = new HashMap<>(); producerConfiguration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerConfiguration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerConfiguration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(producerConfiguration); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
package com.example; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> consumerConfiguration = new HashMap<>(); consumerConfiguration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consumerConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id"); consumerConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(consumerConfiguration); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
Step 5) Create KafKaProducerService to publish message to a topic
Spring provides KafkaTemplate
to publish a message to a Kafka
topic. The topic will be created when KafkaTemplate send("test", message)
is called where "test" it the topic name. You can also create topic explicitly using following:
kafka-topics.bat --create --topic test --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
package com.example; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafKaProducerService { private static final Logger logger = LoggerFactory.getLogger(KafKaProducerService.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { this.kafkaTemplate.send("test", message); logger.info(String.format("Message sent -> %s", message)); } }
Step 6) Create KafkaProducerController class that will expose an endpoint to send message from browser
package com.example; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(value = "/kafka") public class KafkaProducerController { private final KafKaProducerService producerService; @Autowired public KafkaProducerController(KafKaProducerService producerService) { this.producerService = producerService; } @GetMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestParam("message") String message) { this.producerService.sendMessage(message); } }
Step 7) Create KafKaConsumerService that will consumer the message
package com.example; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafKaConsumerService { private final Logger logger = LoggerFactory.getLogger(KafKaConsumerService.class); @KafkaListener(topics = "test", groupId = "group_id") public void consume(String message) { logger.info(String.format("Message recieved -> %s", message)); } }
Step 8) Create KafkaApplication class to run the Spring Boot application
package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }
Step 9) Testing KafkaApplication
Start KafkaApplication, open any browser and launch http://localhost:9000/kafka/publish?message=helloAbove url will invoke REST
endpoint /kafka/publish
exposed by KafkaProducerController
. It will call KafKaProducerService sendMessage()
which will publish the message to Kakfa
topic "test". Finally message will be consumed by KafKaConsumerService consume
method. You will see message is published and received by above consumer as shown in below logs.
2021-02-05 13:14:29.794 INFO 21032 --- [nio-9000-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1 2021-02-05 13:14:29.794 INFO 21032 --- [nio-9000-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5 2021-02-05 13:14:29.812 INFO 21032 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: WzUf88tISwO55b5mbmMj1Q 2021-02-05 13:14:29.840 INFO 21032 --- [nio-9000-exec-1] com.example.KafKaProducerService : Message sent -> hello 2021-02-05 13:14:29.915 INFO 21032 --- [ntainer0-0-C-1] com.example.KafKaConsumerService : Message recieved -> hello