Course – LS (cat=Spring)

Get started with Spring 5 and Spring Boot 2, through the reference Learn Spring course:

>> LEARN SPRING

1. Overview

In this article, we'll discuss the importance of implementing retry in Kafka.

We'll explore the various options available for implementing it on Spring Boot and discuss the best practices for maximizing the reliability and resilience of Kafka Consumer.

If this is our first configuring Kafka on Spring and we want to learn more, let's start with an intro article to Spring and Kafka.

2. Project Setup

Let's create a new Spring Boot project and add the spring-kafka dependency:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.0.1</version>
</dependency>

Let's create an object:

public class Greeting {

    private String msg;
    private String name;

    // standard constructors, getters and setters
}

3. Kafka Consumer

A Kafka Consumer is a client application that reads data from a Kafka cluster. It subscribes to one or more topics and consumes published messages. Producers send messages to a topic, a category name in which records are stored and published. Topics are divided into several partitions to allow them to scale horizontally. Each partition is an immutable sequence of messages.

Consumers can read messages from a specific partition by specifying an offset, which is the position of the message within the partition. An ack (acknowledgment) is a message sent by a consumer to a Kafka broker to indicate that it has successfully processed a record. The consumer offset will be updated once the ack is sent.

This ensures the message is consumed and won't be delivered to the current listener again.

3.1. Ack Mode

The ack mode determines when the broker updates the consumer's offset.

There are three acknowledgment modes:

  1. auto-commit: the consumer sends an acknowledgment to the broker as soon as it receives a message
  2. after-processing: the consumer only sends an acknowledgment to the broker after it has successfully processed the message
  3. manual: the consumer waits until it receives specific instructions before sending an acknowledgment to the broker

The Ack mode determines how the consumer handles the messages it reads from the Kafka cluster.

Let's create a new bean that makes a new ConcurrentKafkaListenerContainerFactory:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    // Other configurations
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    factory.afterPropertiesSet();
    return factory;
}

There are several ack modes available that we can configure:

  1. AckMode.RECORD: in this after-processing mode, the consumer sends an acknowledgment for each message it processes
  2. AckMode.BATCH: in this manual mode, the consumer sends an acknowledgment for a batch of messages rather than for each message
  3. AckMode.COUNT: in this manual mode, the consumer sends an acknowledgment after it has processed a specific number of messages
  4. AckMode.MANUAL: in this manual mode, the consumer doesn't send an acknowledgment for the messages it processes
  5. AckMode.TIME: in this manual mode, the consumer sends an acknowledgment after a certain amount of time has passed

To implement a retry logic for message processing in Kafka, we need to select an AckMode.

This AckMode should allow the consumer to indicate to the broker which specific messages have been successfully processed.

This way, the broker can redeliver any unacknowledged messages to another consumer.

This may be the RECORD or MANUAL mode in the case of blocking retry.

4. Blocking Retry

A blocking retry enables the consumer to attempt consuming a message again if the initial attempt fails due to a temporary error.

The consumer waits a certain amount of time, known as the retry backoff period, before trying to consume the message again.

Additionally, the consumer can customize the retry backoff period using either a fixed delay or an exponential backoff strategy.

It can also set maximum retries before giving up and marking the message as failed.

4.1. Error Handler

Let's define two properties on the Kafka configuration class:

@Value(value = "${kafka.backoff.interval}")
private Long interval;

@Value(value = "${kafka.backoff.max_failure}")
private Long maxAttempts;

To handle all exceptions thrown during the consuming process, let's define a new error handler:

@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
        // logic to execute when all the retry attemps are exhausted
    }, fixedBackOff);
    return errorHandler;
}

The FixedBackOff class takes two arguments:

  • interval: The amount of time to wait between retries in milliseconds.
  • maxAttempts: The maximum number of times to retry the operation before giving up.

In this strategy, the consumer waits a fixed time before retrying the message consumption.

The DefaultErrorHandler is being initialized with a lambda function representing the logic to execute when all the retry attempts are exhausted.

The lambda function takes two arguments:

  • consumerRecord: represents the Kafka record that caused the error.
  • exception: represents the exception that was thrown.

4.2. Container Factory

Let's add on container factory bean the error handler:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    // Other configurations
    factory.setCommonErrorHandler(errorHandler());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    factory.afterPropertiesSet();
    return factory;
}

If a retry policy is present, set the ack mode to AckMode.RECORD to make sure that the consumer will redeliver messages if an error happens during processing.

We shouldn't set the ack mode to AckMode.BATCH or AckMode.TIME because the consumer will acknowledge multiple messages at once.

That's because the consumer won't redeliver all messages in the batch or time window to itself if an error occurs while processing a message.

So the retry policy won't be able to handle the error properly.

4.3. Retryable Exception and Not Retryable Exception

We can specify which exceptions are retryable and which are non-retryable.

Let's modify the ErrorHandler:

@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
        // logic to execute when all the retry attemps are exhausted
    }, fixedBackOff);
    errorHandler.addRetryableExceptions(SocketTimeoutException.class);
    errorHandler.addNotRetryableExceptions(NullPointerException.class);
    return errorHandler;
}

So we have specified which exception types should trigger a retry policy in the consumer.

The SocketTimeoutException is considered retriable, while the NullPointerException is considered non-retriable.

If we don't set any retryable exceptions, the default set of retriable exceptions will be used:

4.4. Advantages and Disadvantages

In the blocking retry, when a message processing fails, the consumer blocks until the retry mechanism finishes its retries or until the maximum number of retries is reached.

There are several advantages and disadvantages of using blocking retry.

Blocking retry can improve the reliability of the message processing pipeline by allowing the consumer to retry the consumption of a message if an error occurs. This can help to ensure that messages are processed successfully, even if transient errors occur. 

Blocking retry can simplify the implementation of the message processing logic by abstracting away the retry mechanism. The consumer can focus on processing the message and leave the retry mechanism to handle any errors that may occur.

Finally, blocking retry may introduce delays in the message processing pipeline if the consumer is required to wait for the retry mechanism to complete its retries. This can impact the overall performance of the system. Blocking retry may also cause the consumer to consume more resources, such as CPU and memory, as it waits for the retry mechanism to complete its retries. This can impact the overall scalability of the system.

5. Non-Blocking Retry

Non-blocking retry allows the consumer to retry the consumption of a message asynchronously without blocking the execution of the message listener method.

5.1. @RetryableTopic

Let's add on the KafkaListener the annotation @RetryableTopic:

@Component
@KafkaListener(id = "multiGroup", topics = "greeting")
public class MultiTypeKafkaListener {

    @KafkaHandler
    @RetryableTopic(
      backoff = @Backoff(value = 3000L), 
      attempts = "5", 
      autoCreateTopics = "false",
      include = SocketTimeoutException.class, exclude = NullPointerException.class)
    public void handleGreeting(Greeting greeting) {
        System.out.println("Greeting received: " + greeting);
    }
}

We have customized the retry behavior by modifying several properties, such as:

  • backoff: This property specifies the backoff strategy to use when retrying a failed message.
  • attempts: This property specifies the maximum number of times a message should be retried before giving up.
  • autoCreateTopics: This property specifies whether or not to automatically create the retry topic and DLT (Dead Letter Topic)  if they do not already exist.
  • include: This property specifies the exceptions that should trigger a retry.
  • exclude: This property specifies the exceptions that should not trigger a retry.

When a message fails to be delivered to its intended topic, it'll be automatically sent to the retry topic for retrying.

If the message still can't be delivered after the maximum number of attempts, it'll be sent to the DLT for further processing.

5.2. Advantages and Disadvantages

There are several advantages to implementing a non-blocking retry:

  • Improved performance: Non-blocking retries allow for retrying of failed messages without blocking the calling thread, which can improve the overall performance of the application
  • Increased reliability: Non-blocking retries can help the application recover from failures and continue processing messages, even if some messages fail to be delivered

However, there are also some potential disadvantages to consider when implementing non-blocking retries:

  • Increased complexity: Non-blocking retries can add additional complexity to the application, as we'll need to handle the retry logic and the DLT
  • Risk of message duplication: If a message is successfully delivered after a retry, the message may be delivered multiple times if both the original delivery and the retry succeeded. We'll need to consider this risk and implement measures to prevent message duplication if it's a concern
  • Order of the message: retried messages are sent to the retry topic asynchronously and may be delivered to the original topic later than non-retried messages.

6. Conclusion

In this tutorial, we analyzed how to implement retry logic on a Kafka topic, including blocking and non-blocking approaches.

As always, the full source code of the examples can be found over on GitHub.

Course – LS (cat=Spring)

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are closed on this article!