Partner – Microsoft – NPI (cat= Spring)
announcement - icon

Azure Spring Apps is a fully managed service from Microsoft (built in collaboration with VMware), focused on building and deploying Spring Boot applications on Azure Cloud without worrying about Kubernetes.

And, the Enterprise plan comes with some interesting features, such as commercial Spring runtime support, a 99.95% SLA and some deep discounts (up to 47%) when you are ready for production.

>> Learn more and deploy your first Spring Boot app to Azure.

You can also ask questions and leave feedback on the Azure Spring Apps GitHub page.

1. Overview

In this article, we’ll learn how to configure multiple listeners for the same Kafka topic by looking at a practical example.

If this is the first time configuring Kafka on Spring, a good place to start is with our introduction to Apache Kafka with Spring.

2. Project Setup

Let’s build a books consumer service that listens to the newly arriving books within the library and consume them for different purposes like full-text content search, price indexing, or user notifications.

At first, let’s create a Spring Boot service and use the spring-kafka dependency:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Additionally, let’s define the BookEvent that the listeners will consume:

public class BookEvent {

    private String title;
    private String description;
    private Double price;

    //  standard constructors, getters and setters
}

3. Producing Messages

Kafka Producers are critical to the ecosystem because producers write messages to the Kafka Cluster. Considering this, first, we need to define a producer that writes messages on a topic that is later consumed by the consumer application.

Following our example, let’s write a simple Kafka producer function that writes new BookEvent objects to the “books” topic.

private static final String TOPIC = "books";

@Autowired
private KafkaTemplate<String, BookEvent> bookEventKafkaTemplate;

public void sentBookEvent(BookEvent book){
    bookEventKafkaTemplate.send(TOPIC, UUID.randomUUID().toString(), book);
}

4. Consuming the Same Kafka Topic From Multiple Listeners

Kafka Consumers are client applications that subscribe to one or more topics of a Kafka Cluster. Later, we’ll look at how to set up multiple listeners on the same topic.

4.1. Consumer Configuration

First, to configure a consumer, we need to define the ConcurrentKafkaListenerContainerFactory Bean that the listeners will require.

Now, let’s define the container factory that we’ll use to consume BookEvent objects:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, BookEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, BookEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    public ConsumerFactory<String, BookEvent> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        
        // required consumer factory properties

        return new DefaultKafkaConsumerFactory<>(props);
    }
}

Next, we’ll look at different strategies for listening to incoming messages.

4.2. Multiple Listeners With the Same Consumer Group

One strategy to add multiple listeners to the same consumer group is to increase the concurrency level within the same consumer group.  Therefore, we can simply specify this within the @KafkaListener annotation.

To understand how this works, let’s define a notification listener for our library:

@KafkaListener(topics = "books", groupId = "book-notification-consumer", concurrency = "2")
public void bookNotificationConsumer(BookEvent event) {
    logger.info("Books event received for notification => {}", event);
}

Next, we’ll see the console output after publishing three messages. Furthermore, let’s understand why the messages are consumed only once:

Books event received for notification => BookEvent(title=book 1, description=description 1, price=1.0)
Books event received for notification => BookEvent(title=book 2, description=description 2, price=2.0)
Books event received for notification => BookEvent(title=book 3, description=description 3, price=3.0)

This happens because, internally, for each concurrency level, Kafka instantiates a new listener within the same consumer group. Furthermore, the scope of all listener instances within the same consumer group is to distribute the messages among each other to finish the work faster and increase the throughput.

4.3. Multiple Listeners With Different Consumer Groups

If we need to consume the same messages multiple times and apply distinct processing logic for each listener, we must configure the @KafkaListener to have distinct group IDs. By doing this, Kafka will create dedicated consumer groups for each listener and push all the published messages to each listener.

To see this strategy in action, let’s define one listener for full-text search indexing and one responsible for price indexing. Both will listen to the same “books” topic:

@KafkaListener(topics = "books", groupId = "books-content-search")
public void bookContentSearchConsumer(BookEvent event) {
    logger.info("Books event received for full-text search indexing => {}", event);
}

@KafkaListener(topics = "books", groupId = "books-price-index")
public void bookPriceIndexerConsumer(BookEvent event) {
    logger.info("Books event received for price indexing => {}", event);
}

Now, let’s run the code above and analyze the output:

Books event received for price indexing => BookEvent(title=book 1, description=description 1, price=1.0)
Books event received for full-text search indexing => BookEvent(title=book 1, description=description 1, price=1.0)
Books event received for full-text search indexing => BookEvent(title=book 2, description=description 2, price=2.0)
Books event received for price indexing => BookEvent(title=book 2, description=description 2, price=2.0)
Books event received for full-text search indexing => BookEvent(title=book 3, description=description 3, price=3.0)
Books event received for price indexing => BookEvent(title=book 3, description=description 3, price=3.0)

As we can see, both listeners receive each BookEvent and can apply independent processing logic for all incoming messages.

5. When to Use the Different Listener Strategies

As we’ve already learned, we can set up multiple listeners either by configuring the concurrency property of the @KafkaListener annotation with a value greater than one or by defining multiple @KafkaListener methods that listen to the same Kafka topic and have different consumer IDs assigned.

Choosing one strategy or the other depends on what we want to achieve. As long as we address performance concerns to increase the throughput by processing messages faster, the right strategy is to increase the number of listeners within the same consumer group.

However, to process the same message multiple times to fulfill different requirements, we should define dedicated listeners with distinct consumer groups that listen to the same topic.

As a rule of thumb, we should use one consumer group for each requirement we need to fulfill, and if we need to make that listener faster, we can increase the number of listeners within the same consumer group.

6. Conclusion

In this article, we learned how to configure multiple listeners for the same topic using the Spring Kafka library, looking at a practical example of a book library. We started with the Producer and Consumer configuration and continued with the different ways to add multiple listeners for the same topic.

As always, the complete source code of the examples can be found over on GitHub.

Course – LS (cat=Spring)

Get started with Spring and Spring Boot, through the Learn Spring course:

>> THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.