#!/bin/bash
-IMAGE=juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-error-handling-SNAPSHOT
if [ "$1" = "cleanup" ]
then
}
group = 'de.juplo.kafka'
-version = '1.1-deserialization-error-SNAPSHOT'
+version = '1.1-error-handling-SNAPSHOT'
java {
toolchain {
juplo.consumer.topic: test
peter:
- image: juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT
+ image: juplo/spring-consumer:1.1-error-handling-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: peter
juplo.consumer.topic: test
ute:
- image: juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT
+ image: juplo/spring-consumer:1.1-error-handling-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: ute
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-deserialization-error-SNAPSHOT</version>
+ <version>1.1-error-handling-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.FixedBackOff;
+import java.time.Clock;
import java.util.Properties;
Consumer<String, Long> kafkaConsumer,
RecordHandler<String, Long> recordHandler,
ApplicationProperties properties,
+ Clock clock,
+ BackOff backOffStrategy,
ConfigurableApplicationContext applicationContext)
{
return
properties.getConsumerProperties().getTopic(),
kafkaConsumer,
recordHandler,
+ clock,
+ properties.getConsumerProperties().getPollRequestTimeout(),
+ properties.getConsumerProperties().getMaxPollInterval(),
+ properties.getConsumerProperties().getMaxTimePerRecord(),
+ properties.getConsumerProperties().getMinSlackPerPollInterval(),
+ backOffStrategy,
() -> applicationContext.close());
}
return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value);
}
+ @Bean
+ public BackOff backOffStrategy(ApplicationProperties properties)
+ {
+ return new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries());
+ }
+
+ @Bean
+ public Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+
@Bean(destroyMethod = "")
public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
{
{
props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name());
}
- if (properties.getConsumerProperties().getAutoCommitInterval() != null)
- {
- props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
- }
+ props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
props.put("metadata.maxage.ms", 5000); // 5 Sekunden
+ props.put("max.poll.interval.ms", (int) properties.getConsumer().getMaxPollInterval().toMillis());
+ props.put("max.poll.interval.records", properties.getConsumer().getMaxPollRecords());
+ props.put("fetch.max.wait.ms", (int)properties.getConsumer().getFetchMaxWait().toMillis());
props.put("partition.assignment.strategy", StickyAssignor.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", LongDeserializer.class.getName());
@NotEmpty
private String topic;
private OffsetReset autoOffsetReset;
+ @NotNull
private Duration autoCommitInterval;
+ @NotNull
+ private Duration pollRequestTimeout;
+ @NotNull
+ private Duration maxPollInterval;
+ @NotNull
+ private int maxPollRecords;
+ @NotNull
+ private Duration fetchMaxWait;
+ @NotNull
+ private Duration maxTimePerRecord;
+ @NotNull
+ private Duration minSlackPerPollInterval;
+ @NotNull
+ private int numRetries;
enum OffsetReset { latest, earliest, none }
}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+
+
+@RequiredArgsConstructor
+@Slf4j
+class BackOffState
+{
+ private final String logPrefix;
+ private final Clock clock;
+ private final BackOff backOffStrategy;
+
+ @Getter
+ private long offset;
+ private BackOffExecution backOffExecution;
+ private int numRetries = 0;
+ private Instant timeNextRetryIsDue;
+
+
+ void start(long offset)
+ {
+ this.offset = offset;
+ log.info("{} - Back-Off requested for offset={}", logPrefix, offset);
+ backOffExecution = backOffStrategy.start();
+ initializeNextBackOff();
+ }
+
+ boolean isWaitingForNextRetry()
+ {
+ if (timeNextRetryIsDue == null)
+ {
+ return false;
+ }
+
+ Instant now = clock.instant();
+ Duration remaining = Duration.between(now, timeNextRetryIsDue);
+ if (remaining.isNegative())
+ {
+ log.info(
+ "{} - {}. retry for offset={}, lateness: {}",
+ logPrefix,
+ numRetries,
+ offset,
+ remaining.abs());
+ initializeNextBackOff();
+ return false;
+ }
+ else
+ {
+ log.info(
+ "{} - Next retry for offset={} is due in {}",
+ logPrefix,
+ offset,
+ remaining);
+ return true;
+ }
+ }
+
+ boolean isStarted(long offset)
+ {
+ return this.offset == offset && backOffExecution != null;
+ }
+
+ boolean isCompleted(long offset)
+ {
+ return this.offset == offset && timeNextRetryIsDue == null;
+ }
+
+ void reset()
+ {
+ timeNextRetryIsDue = null;
+ offset = -1l;
+ }
+
+ private void initializeNextBackOff()
+ {
+ long backOffMillis = backOffExecution.nextBackOff();
+
+ if (backOffMillis == BackOffExecution.STOP)
+ {
+ timeNextRetryIsDue = null;
+ }
+ else
+ {
+ numRetries++;
+ timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
+ }
+ }
+}
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.util.backoff.BackOff;
+import java.time.Clock;
import java.time.Duration;
+import java.time.Instant;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
@Slf4j
-public class ExampleConsumer<K, V> implements Runnable
+public class ExampleConsumer<K, V> implements ConsumerRebalanceListener, Runnable
{
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
private final RecordHandler<K, V> recordHandler;
private final Thread workerThread;
+ private final Clock clock;
+ private final Duration pollRequestTimeout;
+ private final Duration maxPollInterval;
+ private final Duration minTimeForNextRecord;
+ private final BackOff backOffStrategy;
+ private final BackOffState[] backOffState;
private final Runnable closeCallback;
private volatile boolean running = false;
String topic,
Consumer<K, V> consumer,
RecordHandler<K, V> recordHandler,
+ Clock clock,
+ Duration pollRequestTimeout,
+ Duration maxPollInterval,
+ Duration maxTimePerRecord,
+ Duration minSlackPerPollInterval,
+ BackOff backOffStrategy,
Runnable closeCallback)
{
this.id = clientId;
this.topic = topic;
this.consumer = consumer;
this.recordHandler = recordHandler;
+ this.clock = clock;
+ this.pollRequestTimeout = pollRequestTimeout;
+ this.maxPollInterval = maxPollInterval;
+ this.minTimeForNextRecord = maxTimePerRecord.plus(minSlackPerPollInterval);
+ this.backOffStrategy = backOffStrategy;
+
+ int numPartitions = consumer.partitionsFor(topic).size();
+ log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+ this.backOffState = new BackOffState[numPartitions];
workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
workerThread.start();
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic));
+ consumer.subscribe(Arrays.asList(topic), this);
running = true;
while (running)
{
try
{
- ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1));
+ ConsumerRecords<K, V> records = consumer.poll(pollRequestTimeout);
log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<K, V> record : records)
+
+ Instant deadline = clock.instant().plus(maxPollInterval);
+ boolean abortCurrentPoll = false;
+
+ for (TopicPartition topicPartition : records.partitions())
{
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
+ if (backOffState[topicPartition.partition()].isWaitingForNextRetry())
+ {
+ log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition);
+ consumer.seek(topicPartition, backOffState[topicPartition.partition()].getOffset());
+ continue;
+ }
+
+ List<ConsumerRecord<K, V>> recordsForPartition = records.records(topicPartition);
+ log.debug(
+ "{} - Received {} messages for partition {}",
+ id,
+ recordsForPartition.size(),
+ topicPartition);
+
+ for (ConsumerRecord<K, V> record : recordsForPartition)
+ {
+ if (abortCurrentPoll)
+ {
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+
+ Instant now = clock.instant();
+ Duration timeLeft = Duration.between(now, deadline);
+ log.trace("{} - Time left for current poll: {}", id, timeLeft);
+
+ if (timeLeft.minus(minTimeForNextRecord).isNegative())
+ {
+ log.info(
+ "{} - Aborting record handling, because only {} are left until the poll-interval expires!",
+ id,
+ timeLeft);
+ abortCurrentPoll = true;
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+
+ try
+ {
+ handleRecord(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.key(),
+ record.value());
+ }
+ catch (RetriableErrorException e)
+ {
+ // Seeking to the offset of the record, that raised the exception, and
+ // leaving the loop afterwards, retries the record
+ int partition = topicPartition.partition();
+ if (!backOffState[partition].isStarted(record.offset()))
+ {
+ log.info("{} - First occurrence of a retryable error: {}", id, e.toString());
+ backOffState[partition].start(record.offset());
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+ else
+ {
+ if (backOffState[partition].isCompleted(record.offset()))
+ {
+ log.warn("{} - Ignoring retryable error: {}", id, e.toString());
+ }
+ else
+ {
+ log.info(
+ "{} - Retry in progress for offset={} in {}, error: {}",
+ id,
+ record.offset(),
+ partition,
+ e.toString());
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+ }
+ }
+ catch (NonRetriableErrorException e)
+ {
+ // Just ignore, to skip
+ log.warn("{} - Ignoring non-retryable error: {}", id, e.toString());
+ }
+
+ backOffState[topicPartition.partition()].reset();
+ }
}
}
catch (RecordDeserializationException e)
Integer partition,
Long offset,
K key,
- V value)
+ V value) throws RetriableErrorException, NonRetriableErrorException
{
consumed++;
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
}
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(topicPartition ->
+ backOffState[topicPartition.partition()] = new BackOffState(
+ id + " - partition=" + topicPartition.partition(),
+ clock,
+ backOffStrategy));
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp -> backOffState[tp.partition()] = null);
+ }
+
+
public void shutdown() throws InterruptedException
{
log.info("{} joining the worker-thread...", id);
--- /dev/null
+package de.juplo.kafka;
+
+public class NonRetriableErrorException extends Exception
+{
+ public NonRetriableErrorException(String message)
+ {
+ super(message);
+ }
+}
Integer partition,
Long offset,
K key,
- V value);
+ V value) throws RetriableErrorException, NonRetriableErrorException;
}
--- /dev/null
+package de.juplo.kafka;
+
+public class RetriableErrorException extends Exception
+{
+ public RetriableErrorException(String message)
+ {
+ super(message);
+ }
+}
topic: test
auto-offset-reset: earliest
auto-commit-interval: 5s
+ poll-request-timeout: 1s
+ max-poll-interval: 5m
+ max-poll-records: 500
+ fetch-max-wait: 500ms
+ max-time-per-record: 30s
+ min-slack-per-poll-interval: 1s
+ num-retries: 10
management:
endpoint:
shutdown:
--- /dev/null
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
+
+import java.time.Clock;
+import java.time.Instant;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+
+
+@ExtendWith(MockitoExtension.class)
+class BackOffStateTest
+{
+ final static String ID = "TEST";
+ final static long OFFSET = 666;
+ final static Instant NOW = Instant.now();
+ final static long BACK_OFF = 1000l;
+
+
+ @Mock Clock clock;
+ @Mock BackOff backOff;
+ @Mock BackOffExecution backOffExecution;
+
+
+ private BackOffState NotStartedBackOffState()
+ {
+ // GIVEN
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, clock, backOff);
+
+ return backOffState;
+ }
+
+ @Test
+ @DisplayName("A not started BackOffState is not waiting for a retry")
+ void NotStartedBackOffStateIsNotWaitingForRetry()
+ {
+ // GIVEN
+ BackOffState backOffState = NotStartedBackOffState();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A not started BackOffState is not started")
+ void NotStartedBackOffStateIsNotStarted()
+ {
+ // GIVEN
+ BackOffState backOffState = NotStartedBackOffState();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isFalse();
+ }
+
+
+ private BackOffState StartedBackoffStateWithNoRetries()
+ {
+ // GIVEN
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, clock, backOff);
+ backOffState.start(OFFSET);
+
+ return backOffState;
+ }
+
+ @Test
+ @DisplayName("A started BackOffState with no retries is not waiting for a retry")
+ void StartedBackOffStateWithNoRetriesIsNotWaitingForRetry()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithNoRetries();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState with no retries is started")
+ void StartedBackOffStateWithNoRetriesIsStarted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithNoRetries();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState with no retries is completed")
+ void StartedBackOffStateWithNoRetriesIsCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithNoRetries();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isCompleted(OFFSET)).isTrue();
+ }
+
+
+ private BackOffState StartedBackoffStateWithRetries()
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, clock, backOff);
+ backOffState.start(OFFSET);
+
+ return backOffState;
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is waiting for a retry if the time is not due")
+ void StartedBackOffStateIsWaitingForRetryIfTimeIsNotDue()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF));
+
+ // WHEN
+ boolean result = backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(result).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is started if the time is not due")
+ void StartedBackOffStateIsStarted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF));
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not completed if the time is not due")
+ void StartedBackOffStateIsNotCompletedIfTimeIsNotDue()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF));
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isCompleted(OFFSET)).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not waiting for a retry if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueButRetryNotCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+ // WHEN
+ boolean result = backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is started if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsStartedIfTheTimeIsDueButRetryNotCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not completed if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsNotCompletedIfTheTimeIsDueButRetryNotCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isCompleted(OFFSET)).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not waiting for a retry if the time is due and the retry is completed")
+ void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueAndRetryIsCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ boolean result = backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is started if the time is due and the retry is completed")
+ void StartedBackOffStateIsStartedIfTheTimeIsDueAndRetryIsCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is completed if the time is due and the retry is completed")
+ void StartedBackOffStateIsCompletedIfTheTimeIsDueAndRetryIsCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isCompleted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not waiting for a retry after a reset")
+ void StartedBackOffStateIsNotWaitingForRetryAfterReset()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+
+ // WHEN
+ backOffState.reset();
+
+ // THEN
+ assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not started after a reset")
+ void StartedBackOffStateIsNotStartedAfterReset()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+
+ // WHEN
+ backOffState.reset();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isFalse();
+ }
+}
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.FixedBackOff;
+import java.time.Clock;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS;
-import static de.juplo.kafka.ExampleConsumerTest.TOPIC;
+import static de.juplo.kafka.ExampleConsumerTest.*;
+import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(
},
properties = {
"juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms",
+ "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms",
+ "juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms",
+ "juplo.consumer.fetch-max-wait=" + FETCH_MAX_WAIT_MS + "ms",
+ "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms",
+ "juplo.consumer.num-retries=" + NUM_RETRIES,
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"logging.level.de.juplo.kafka=TRACE",
})
@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
public class ExampleConsumerTest
{
- @DisplayName("All messages are consumed")
+ @DisplayName("All messages are consumed as expected")
@Test
void testOnlyValidMessages()
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
Awaitility
.await("All messages are consumed")
.atMost(Duration.ofSeconds(5))
- .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
+ @DisplayName("Delayed messages are consumed as expected")
+ @ParameterizedTest(name = "delay for message-consumption: {0}")
+ @ValueSource(ints = { 10, 25, 50, 75, 100 })
+ void testOnlyValidMessagesButAllDelayed(int delay)
+ {
+ createExampleConsumer();
+ mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are consumed")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
}
@DisplayName("A deserialization exception is skipped and all valid messages are consumed")
@Test
void testDeserializationException()
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
Awaitility
.await("All valid messages are consumed")
.atMost(Duration.ofSeconds(15))
- .until(() -> mockRecordHandler.getNumMessagesHandled() == 19);
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
}
@DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application")
@Test
void testUnexpectedDomainError() throws Exception
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
.await("The ConsumerRunnable is exited by an unexpected exception")
.atMost(Duration.ofSeconds(5))
.pollInterval(Duration.ofMillis(250))
- .until(() -> isTerminatedExceptionally.get());
+ .untilAsserted(() -> assertThat(isTerminatedExceptionally.get()).isTrue());
+ }
+
+ @DisplayName("A message, that triggers an non-retryable exception in the domain-logic, is skipped and all other messages are consumed")
+ @Test
+ void testNonRetryableDomainError() throws Exception
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersNonRetriableExceptionInDomain(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All other valid messages are consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
+ }
+
+ @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
+ @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}")
+ @ValueSource(ints = { 1, 2, 3, 4, 5, 6 })
+ void testOneMessageCausesRetryableDomainErrors(int numFailures)
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetriableExceptionInDomain(3,numFailures);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
+ @DisplayName("All messages on a partition are delayed and a message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
+ @ParameterizedTest(name = "Delay for normal messages: {0}ms")
+ @ValueSource(ints = { 10, 20, 50, 100, 150, 200 })
+ void testOneMessageCausesRetryableDomainErrorsWhileAllMessagesAreDelayed(int delay)
+ {
+ createExampleConsumer();
+ mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+ mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(100);
+
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 1);
+ sendValidMessage(3);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
+ @DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed")
+ @Test
+ void testOneMessageCausesRetryableDomainErrors()
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetriableExceptionInDomain(3,66);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All other messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
+ }
+
+ @DisplayName("Several messages are triggering retryable exception in one partition, some of them fail so often, that they are skipped and all successful messages are eventually consumed")
+ @ParameterizedTest(name = "Delay for errors: {0}ms")
+ @ValueSource(ints = { 10, 20, 50, 100, 150, 200 })
+ void testSeveralMessagesCausesRetryableDomainErrorsInOnePartition(int delay)
+ {
+ createExampleConsumer();
+ mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(delay);
+
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 4);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 6);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 1);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 5);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 6);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 1);
+ sendValidMessage(3);
+
+ Awaitility
+ .await("All other messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(16));
+ }
+
+ @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed")
+ @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
+ @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" })
+ void testThreeMessagesCauseRetryableDomainErrors(
+ int numFailuresForMessageA,
+ int numFailuresForMessageB,
+ int numFailuresForMessageC)
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetriableExceptionInDomain(3,numFailuresForMessageA);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageB);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageC);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(20))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(30));
+ }
+
+ @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried, but one of them fails too often and is skipped, still all other messages are eventually consumed")
+ @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
+ @CsvSource({ "66,3,4", "4,66,2", "1,2,66" })
+ void testThreeMessagesCauseRetryableDomainErrorsAndOneFailsTooOften(
+ int numFailuresForMessageA,
+ int numFailuresForMessageB,
+ int numFailuresForMessageC)
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetriableExceptionInDomain(3,numFailuresForMessageA);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageB);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageC);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All other messages are eventually consumed")
+ .atMost(Duration.ofSeconds(20))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(29));
+ }
+
+ @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried 3 times with a fixed back-of and all messages are eventually consumed")
+ @ParameterizedTest(name = "Back-Off millis: {0}")
+ @ValueSource(ints = { 100, 250, 500, 1000 })
+ void testOneMessageIsRetriedWithFixedBackOff(int backOffMillis)
+ {
+ BackOff backOff = new FixedBackOff(backOffMillis, 3);
+ createExampleConsumer(backOff);
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
}
static final String ID = "TEST";
static final String TOPIC = "ExampleConsumerTest_TEST";
static final int NUM_PARTITIONS = 10;
+ static final int NUM_RETRIES = 6;
+ static final int POLL_REQUEST_TIMEOUT_MS = 50;
+ static final int MAX_POLL_INTERVALL_MS = 500;
+ static final int MAX_TIME_PER_RECORD_MS = 100;
+ static final int FETCH_MAX_WAIT_MS = 50;
+ static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100;
@Autowired
KafkaTemplate<String, byte[]> kafkaTemplate;
+ @Autowired ApplicationProperties properties;
+ @Autowired Clock clock;
final LongSerializer serializer = new LongSerializer();
final long[] currentOffsets = new long[NUM_PARTITIONS];
ExampleConsumer exampleConsumer;
- @BeforeEach
- void createExampleConsumer(@Autowired ApplicationProperties properties)
+ void createExampleConsumer()
+ {
+ createExampleConsumer(new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()));
+ }
+
+ void createExampleConsumer(BackOff backOff)
{
ApplicationConfiguration configuration = new ApplicationConfiguration();
Consumer<String, Long> consumer = configuration.kafkaConsumer(properties);
TOPIC,
consumer,
mockRecordHandler,
+ clock,
+ properties.getConsumerProperties().getPollRequestTimeout(),
+ properties.getConsumerProperties().getMaxPollInterval(),
+ properties.getConsumerProperties().getMaxTimePerRecord(),
+ properties.getConsumerProperties().getMinSlackPerPollInterval(),
+ backOff,
() -> isTerminatedExceptionally.set(true));
}
+ @BeforeEach
+ void resetParameters()
+ {
+ mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(0);
+ mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(0);
+ }
+
@AfterEach
void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
{
send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION);
}
+ private void sendMessageThatTriggersNonRetriableExceptionInDomain(int partition)
+ {
+ send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION));
+ }
+
+ private void sendMessageThatTriggersRetriableExceptionInDomain(int partition, int numFailures)
+ {
+ send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION * numFailures));
+ }
+
private void send(int partition, long message)
{
send(partition, serializer.serialize(TOPIC, message));
public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+ public final static int VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION = -2;
+ public final static int VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION = -3;
+
@TestConfiguration
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
return AdminClient.create(properties);
}
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
}
}
package de.juplo.kafka;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import static de.juplo.kafka.ExampleConsumerTest.*;
-@RequiredArgsConstructor
@Slf4j
public class MockRecordHandler implements RecordHandler<String, Long>
{
- @Getter
+ private final Map<OffsetInPartition, Integer> retriableErrors = new HashMap<>();
+
+ Duration normalRecordHandlingDelay;
+ Duration exceptionalRecordHandlingDelay;
+
private int numMessagesHandled = 0;
+ public int getNumMessagesHandled()
+ {
+ return numMessagesHandled;
+ }
+
@Override
public void handleRecord(
String topic,
Integer partition,
Long offset,
String key,
- Long value)
+ Long value) throws RetriableErrorException, NonRetriableErrorException
{
if (value != null && value < 0)
{
- generateError(value);
+ generateError(new OffsetInPartition(offset, partition), value.intValue());
}
+ sleep(normalRecordHandlingDelay);
numMessagesHandled++;
log.trace("Handled {} messages so far", numMessagesHandled);
}
- private void generateError(long value)
+ private void generateError(
+ OffsetInPartition offset,
+ int value) throws RetriableErrorException, NonRetriableErrorException
{
if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION)
{
throw new RuntimeException("Unexpected application error!");
}
- log.info("Not specifically mapped error: {}", value);
+ if (value == VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION)
+ {
+ throw new NonRetriableErrorException("Non-Retryable application error!");
+ }
+
+ if ((float)value % (float)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION == 0f)
+ {
+ int totalOccurrences = value / VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION;
+ int occurrence = retriableErrors.compute(
+ offset,
+ (k, v) ->
+ {
+ if (v == null)
+ {
+ v = totalOccurrences;
+ }
+ else
+ {
+ v--;
+ }
+
+ return v;
+ });
+
+ if (occurrence <= 0)
+ {
+ retriableErrors.remove(offset);
+ }
+ else
+ {
+ log.debug(
+ "Simulating occurrence #{} of {} for a retryable error at offset {} in partition {}",
+ totalOccurrences - occurrence + 1,
+ totalOccurrences,
+ offset.offset,
+ offset.partition);
+ sleep(exceptionalRecordHandlingDelay);
+ throw new RetriableErrorException("Retryable application error! Occurrence #" + (totalOccurrences - occurrence + 1));
+ }
+
+ log.info("Simulating a resolved retryable error after {} occurrences of the error", totalOccurrences);
+ }
+ else
+ {
+ log.warn("Not specifically mapped error: {}", value);
+ }
+ }
+
+ private void sleep(Duration duration)
+ {
+ try
+ {
+ Thread.sleep(duration);
+ }
+ catch(InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
}
public void clear()
{
+ retriableErrors.clear();
numMessagesHandled = 0;
}
+
+
+ private static record OffsetInPartition(long offset, int partition) {}
}