Skip to main content

Getting Started with Apache Kafka and Spring Boot

Welcome to another Spring Boot tutorial. Today let’s have a look on how to get started with Apache Kafka on Spring Boot. This tutorial is fully based on Getting Started with Apache Kafka and Spring Boot, while making it a bit better to follow. Hence all the credits goes to original authors. You need to have a Java 11 environment and Confluent account to follow along.

Photo by Markus Winkler on Unsplash

Photo by Markus Winkler on Unsplash

Head on to https://www.confluent.io and signup for an account or login if you have an existing one. Within the Confluent dashboard click on Add cluster button. In the create cluster wizard choose either basic or standard for cluster type. On the second step choose your preferred cloud service provider to provision the resources. Finally provide a name for the cluster and launch.

Confluent Cloud

Confluent Cloud

From dashboard navigate to Cluster overview -> Cluster settings and make a note of Bootstrap server URL. Then navigate to Data integration -> API Keys and create a new key with scope Global access. These values will be needed later when setting up the Spring Boot application. Finally navigate to Topics from the side pane and add a new topic named purchases with default values.

Now let’s headon to Spring Initializr for setting up a new Spring Boot project. Refer to the below screenshot and create a new project as instructed. Once done download and open it on your favourite IDE.

Spring Initializr

Spring Initializr

Navigate to src/main/resources folder and create a file named application.yaml. Make sure to replace the placeholders with actual values for Bootstrap Server URL, Cluster API Key and Cluster API Secret.

spring:
  kafka:
    bootstrap-servers: <BOOTSTRAP_SERVER_URL>
    properties:
      security:
        protocol: SASL_SSL
      sasl:
        jaas:
          config: org.apache.kafka.common.security.plain.PlainLoginModule required username='<CLUSTER_API_KEY>' password='<CLUSTER_API_SECRET>';
        mechanism: PLAIN
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Then open the SpringBootWithKafkaApplication.java file and make changes to it referring to the below Gist.

Scroll to Continue
package com.example.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;

@SpringBootApplication
public class SpringBootWithKafkaApplication {

    private final Producer producer;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Autowired
    SpringBootWithKafkaApplication(Producer producer) {
        this.producer = producer;
    }

    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringBootWithKafkaApplication.class);
        application.setWebApplicationType(WebApplicationType.NONE);
        application.run(args);
    }

    @Bean
    public CommandLineRunner CommandLineRunnerBean() {
        return (args) -> {
            for (String arg : args) {
                switch (arg) {
                    case "--producer":
                        this.producer.sendMessage("awalther", "t-shirts");
                        this.producer.sendMessage("htanaka", "t-shirts");
                        this.producer.sendMessage("htanaka", "batteries");
                        this.producer.sendMessage("eabara", "t-shirts");
                        this.producer.sendMessage("htanaka", "t-shirts");
                        this.producer.sendMessage("jsmith", "book");
                        this.producer.sendMessage("awalther", "t-shirts");
                        this.producer.sendMessage("jsmith", "batteries");
                        this.producer.sendMessage("jsmith", "gift card");
                        this.producer.sendMessage("eabara", "t-shirts");
                        break;
                    case "--consumer":
                        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("myConsumer");
                        listenerContainer.start();
                        break;
                    default:
                        break;
                }
            }
        };
    }

}

After that create a new Java class called Producer under com.example.kafka package and add the below content into it.

package com.example.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Service
public class Producer {

    private static final String TOPIC = "purchases";

    private static final Logger logger = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String key, String value) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, key, value);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                logger.info(String.format("Produced event to topic %s: key = %-10s value = %s", TOPIC, key, value));
            }
            @Override
            public void onFailure(Throwable ex) {
                ex.printStackTrace();
            }
        });
    }

}

Finally create another class called Consumer under com.example.kafka package and add the below content into it.

package com.example.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@Service
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Consumer.class);

    @KafkaListener(id = "myConsumer", topics = "purchases", groupId = "spring-boot", autoStartup = "false")
    public void listen(String value,
                       @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                       @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        logger.info(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value));
    }

}

Run the below command to run the Spring Boot application as Producer.

./gradlew bootRun — args=’ — producer’

Run the below command to run the Spring Boot application as Consumer.

./gradlew bootRun — args=’ — consumer’

Happy Coding! Below is a DEV Community video I have published. Also, you might be interested to check my Medium story Writing unit tests in Spring Boot with JUnit 5.

© 2022 Afrar Malakooth

Related Articles