import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.FixedBackOff;
import java.time.Clock;
+import java.time.Duration;
+import java.util.Properties;
@Configuration
@Slf4j
public class ApplicationConfiguration
{
+ public final static String MAX_POLL_INTERVALL_CONFIG_KEY = "max.poll.interval.ms";
+ public final static Duration MAX_POLL_INTERVALL_DEFAULT_VALUE = Duration.ofMinutes(5);
@Bean
public ExampleConsumer<String, String> exampleConsumer(
Consumer<String, String> kafkaConsumer,
RecordHandler<String, String> recordHandler,
- ConsumerHealthIndicatorAwareRebalanceListener consumerHealthIndicatorAwareRebalanceListener,
ApplicationProperties properties,
KafkaProperties kafkaProperties,
+ Clock clock,
+ BackOff backOffStrategy,
ConfigurableApplicationContext applicationContext)
{
+ String maxPollIntervalMs = kafkaProperties
+ .getConsumer()
+ .getProperties()
+ .get(MAX_POLL_INTERVALL_CONFIG_KEY);
+ Duration maxPollInterval = maxPollIntervalMs == null
+ ? MAX_POLL_INTERVALL_DEFAULT_VALUE
+ : Duration.ofMillis(Integer.valueOf(maxPollIntervalMs));
+
return
new ExampleConsumer<>(
properties.getClientId(),
properties.getConsumerProperties().getTopic(),
kafkaConsumer,
recordHandler,
- consumerHealthIndicatorAwareRebalanceListener,
+ clock,
+ properties.getConsumerProperties().getPollRequestTimeout(),
+ maxPollInterval,
+ properties.getConsumerProperties().getMaxTimePerRecord(),
+ properties.getConsumerProperties().getMinSlackPerPollInterval(),
+ backOffStrategy,
() -> applicationContext.close());
}
}
@Bean
- public ConsumerHealthIndicatorAwareRebalanceListener rebalanceListener(ConsumerHealthIndicator consumerHealthIndicator)
- {
- return new ConsumerHealthIndicatorAwareRebalanceListener(consumerHealthIndicator);
- }
-
- @Bean
- public ConsumerHealthIndicator consumerHealthIndicator(Clock clock)
+ public BackOff backOffStrategy(ApplicationProperties properties)
{
- return new ConsumerHealthIndicator(clock);
+ return new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries());
}
@Bean
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
+import java.time.Duration;
+
@ConfigurationProperties(prefix = "juplo")
@Validated
@NotNull
@NotEmpty
private String topic;
+ @NotNull
+ private Duration pollRequestTimeout;
+ @NotNull
+ private Duration maxTimePerRecord;
+ @NotNull
+ private Duration minSlackPerPollInterval;
+ @NotNull
+ private int numRetries;
}
}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+
+
+@RequiredArgsConstructor
+@Slf4j
+class BackOffState
+{
+ private final String logPrefix;
+ private final Clock clock;
+ private final BackOff backOffStrategy;
+
+ @Getter
+ private long offset;
+ private BackOffExecution backOffExecution;
+ private int numRetries = 0;
+ private Instant timeNextRetryIsDue;
+
+
+ void start(long offset)
+ {
+ this.offset = offset;
+ log.info("{} - Back-Off requested for offset={}", logPrefix, offset);
+ backOffExecution = backOffStrategy.start();
+ initializeNextBackOff();
+ }
+
+ boolean isWaitingForNextRetry()
+ {
+ if (timeNextRetryIsDue == null)
+ {
+ return false;
+ }
+
+ Instant now = clock.instant();
+ Duration remaining = Duration.between(now, timeNextRetryIsDue);
+ if (remaining.isNegative())
+ {
+ log.info(
+ "{} - {}. retry for offset={}, lateness: {}",
+ logPrefix,
+ numRetries,
+ offset,
+ remaining.abs());
+ initializeNextBackOff();
+ return false;
+ }
+ else
+ {
+ log.info(
+ "{} - Next retry for offset={} is due in {}",
+ logPrefix,
+ offset,
+ remaining);
+ return true;
+ }
+ }
+
+ boolean isStarted(long offset)
+ {
+ return this.offset == offset && backOffExecution != null;
+ }
+
+ boolean isCompleted(long offset)
+ {
+ return this.offset == offset && timeNextRetryIsDue == null;
+ }
+
+ void reset()
+ {
+ timeNextRetryIsDue = null;
+ offset = -1l;
+ }
+
+ private void initializeNextBackOff()
+ {
+ long backOffMillis = backOffExecution.nextBackOff();
+
+ if (backOffMillis == BackOffExecution.STOP)
+ {
+ timeNextRetryIsDue = null;
+ }
+ else
+ {
+ numRetries++;
+ timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
+ }
+ }
+}
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.util.backoff.BackOff;
+import java.time.Clock;
import java.time.Duration;
+import java.time.Instant;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
@Slf4j
-public class ExampleConsumer<K, V> implements Runnable
+public class ExampleConsumer<K, V> implements ConsumerRebalanceListener, Runnable
{
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
private final RecordHandler<K, V> recordHandler;
- private final ConsumerRebalanceListener rebalanceListener;
private final Thread workerThread;
+ private final Clock clock;
+ private final Duration pollRequestTimeout;
+ private final Duration maxPollInterval;
+ private final Duration minTimeForNextRecord;
+ private final BackOff backOffStrategy;
+ private final BackOffState[] backOffState;
private final Runnable closeCallback;
private long consumed = 0;
String topic,
Consumer<K, V> consumer,
RecordHandler<K, V> recordHandler,
- ConsumerRebalanceListener rebalanceListener,
+ Clock clock,
+ Duration pollRequestTimeout,
+ Duration maxPollInterval,
+ Duration maxTimePerRecord,
+ Duration minSlackPerPollInterval,
+ BackOff backOffStrategy,
Runnable closeCallback)
{
this.id = clientId;
this.topic = topic;
this.consumer = consumer;
this.recordHandler = recordHandler;
- this.rebalanceListener = rebalanceListener;
+ this.clock = clock;
+ this.pollRequestTimeout = pollRequestTimeout;
+ this.maxPollInterval = maxPollInterval;
+ this.minTimeForNextRecord = maxTimePerRecord.plus(minSlackPerPollInterval);
+ this.backOffStrategy = backOffStrategy;
+
+ int numPartitions = consumer.partitionsFor(topic).size();
+ log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+ this.backOffState = new BackOffState[numPartitions];
workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
workerThread.start();
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic), rebalanceListener);
+ consumer.subscribe(Arrays.asList(topic), this);
while (true)
{
- ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1));
-
- log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<K, V> record : records)
+ try
+ {
+ ConsumerRecords<K, V> records = consumer.poll(pollRequestTimeout);
+
+ log.info("{} - Received {} messages", id, records.count());
+
+ Instant deadline = clock.instant().plus(maxPollInterval);
+ boolean abortCurrentPoll = false;
+
+ for (TopicPartition topicPartition : records.partitions())
+ {
+ if (backOffState[topicPartition.partition()].isWaitingForNextRetry())
+ {
+ log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition);
+ consumer.seek(topicPartition, backOffState[topicPartition.partition()].getOffset());
+ continue;
+ }
+
+ List<ConsumerRecord<K, V>> recordsForPartition = records.records(topicPartition);
+ log.debug(
+ "{} - Received {} messages for partition {}",
+ id,
+ recordsForPartition.size(),
+ topicPartition);
+
+ for (ConsumerRecord<K, V> record : recordsForPartition)
+ {
+ if (abortCurrentPoll)
+ {
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+
+ Instant now = clock.instant();
+ Duration timeLeft = Duration.between(now, deadline);
+ log.trace("{} - Time left for current poll: {}", id, timeLeft);
+
+ if (timeLeft.minus(minTimeForNextRecord).isNegative())
+ {
+ log.info(
+ "{} - Aborting record handling, because only {} are left until the poll-interval expires!",
+ id,
+ timeLeft);
+ abortCurrentPoll = true;
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+
+ try
+ {
+ handleRecord(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.key(),
+ record.value());
+ }
+ catch (RetryableErrorException e)
+ {
+ // Seeking to the offset of the record, that raised the exception, and
+ // leaving the loop afterwards, retries the record
+ int partition = topicPartition.partition();
+ if (!backOffState[partition].isStarted(record.offset()))
+ {
+ log.info("{} - First occurrence of a retryable error: {}", id, e.toString());
+ backOffState[partition].start(record.offset());
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+ else
+ {
+ if (backOffState[partition].isCompleted(record.offset()))
+ {
+ log.warn("{} - Ignoring retryable error: {}", id, e.toString());
+ }
+ else
+ {
+ log.info(
+ "{} - Retry in progress for offset={} in {}, error: {}",
+ id,
+ record.offset(),
+ partition,
+ e.toString());
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+ }
+ }
+ catch (NonRetryableErrorException e)
+ {
+ // Just ignore, to skip
+ log.warn("{} - Ignoring non-retryable error: {}", id, e.toString());
+ }
+
+ backOffState[topicPartition.partition()].reset();
+ }
+ }
+ }
+ catch (RecordDeserializationException e)
{
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
+ log.error(
+ "{} - Ignoring invalid record for offset {} on partition {}: {}",
+ id,
+ e.offset(),
+ e.topicPartition(),
+ e.getMessage());
+ consumer.seek(e.topicPartition(), e.offset() + 1);
}
}
}
Integer partition,
Long offset,
K key,
- V value)
+ V value) throws RetryableErrorException, NonRetryableErrorException
{
consumed++;
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
}
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(topicPartition ->
+ backOffState[topicPartition.partition()] = new BackOffState(
+ id + " - partition=" + topicPartition.partition(),
+ clock,
+ backOffStrategy));
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp -> backOffState[tp.partition()] = null);
+ }
+
+
public void shutdown() throws InterruptedException
{
log.info("{} - Waking up the consumer", id);
--- /dev/null
+package de.juplo.kafka;
+
+public class NonRetryableErrorException extends Exception
+{
+ public NonRetryableErrorException(String message)
+ {
+ super(message);
+ }
+}
Integer partition,
Long offset,
K key,
- V value);
+ V value) throws RetryableErrorException, NonRetryableErrorException;
}
--- /dev/null
+package de.juplo.kafka;
+
+public class RetryableErrorException extends Exception
+{
+ public RetryableErrorException(String message)
+ {
+ super(message);
+ }
+}
juplo:
consumer:
topic: test
+ poll-request-timeout: 1s
+ max-time-per-record: 30s
+ min-slack-per-poll-interval: 1s
+ num-retries: 10
management:
endpoint:
health:
client-id: DEV
consumer:
group-id: my-group
+ value-deserializer: org.apache.kafka.common.serialization.LongDeserializer
+ auto-offset-reset: earliest
+ auto-commit-interval: 5s
+ max-poll-records: 500
+ fetch-max-wait: 500ms
+ properties:
+ max.poll.interval.ms: 300000
logging:
level:
root: INFO
--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.FixedBackOff;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static de.juplo.kafka.AbstractExampleConsumerTest.*;
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@SpringBootTest(
+ classes = {
+ KafkaAutoConfiguration.class,
+ ApplicationProperties.class,
+ AbstractExampleConsumerTest.ConsumerRunnableTestConfig.class,
+ },
+ properties = {
+ "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms",
+ "juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms",
+ "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms",
+ "juplo.consumer.num-retries=" + NUM_RETRIES,
+ "spring.kafka.consumer.auto-offset-reset=earliest",
+ "spring.kafka.consumer.properties.max.poll.interval.ms=" + MAX_POLL_INTERVALL_MS,
+ "spring.kafka.consumer.fetch-max-wait=" + FETCH_MAX_WAIT_MS + "ms",
+ "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
+ "logging.level.de.juplo.kafka=TRACE",
+ })
+@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
+public abstract class AbstractExampleConsumerTest<V>
+{
+ @DisplayName("All messages are consumed as expected")
+ @Test
+ void testOnlyValidMessages()
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are consumed")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
+ @DisplayName("Delayed messages are consumed as expected")
+ @ParameterizedTest(name = "delay for message-consumption: {0}")
+ @ValueSource(ints = { 10, 25, 50, 75, 100 })
+ void testOnlyValidMessagesButAllDelayed(int delay)
+ {
+ createExampleConsumer();
+ mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are consumed")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
+ @DisplayName("A deserialization exception is skipped and all valid messages are consumed")
+ @Test
+ void testDeserializationException()
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendNonDeserializableMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All valid messages are consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
+ }
+
+ @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application")
+ @Test
+ void testUnexpectedDomainError() throws Exception
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRuntimeExceptionInDomain(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("The ConsumerRunnable is exited by an unexpected exception")
+ .atMost(Duration.ofSeconds(5))
+ .pollInterval(Duration.ofMillis(250))
+ .untilAsserted(() -> assertThat(isTerminatedExceptionally.get()).isTrue());
+ }
+
+ @DisplayName("A message, that triggers an non-retryable exception in the domain-logic, is skipped and all other messages are consumed")
+ @Test
+ void testNonRetryableDomainError() throws Exception
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersNonRetryableExceptionInDomain(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All other valid messages are consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
+ }
+
+ @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
+ @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}")
+ @ValueSource(ints = { 1, 2, 3, 4, 5, 6 })
+ void testOneMessageCausesRetryableDomainErrors(int numFailures)
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetryableExceptionInDomain(3,numFailures);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
+ @DisplayName("All messages on a partition are delayed and a message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
+ @ParameterizedTest(name = "Delay for normal messages: {0}ms")
+ @ValueSource(ints = { 10, 20, 50, 100, 150, 200 })
+ void testOneMessageCausesRetryableDomainErrorsWhileAllMessagesAreDelayed(int delay)
+ {
+ createExampleConsumer();
+ mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+ mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(100);
+
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 1);
+ sendValidMessage(3);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
+ @DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed")
+ @Test
+ void testOneMessageCausesRetryableDomainErrors()
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetryableExceptionInDomain(3,66);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All other messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
+ }
+
+ @DisplayName("Several messages are triggering retryable exception in one partition, some of them fail so often, that they are skipped and all successful messages are eventually consumed")
+ @ParameterizedTest(name = "Delay for errors: {0}ms")
+ @ValueSource(ints = { 10, 20, 50, 100, 150, 200 })
+ void testSeveralMessagesCausesRetryableDomainErrorsInOnePartition(int delay)
+ {
+ createExampleConsumer();
+ mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(delay);
+
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 4);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 6);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 1);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 66);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 5);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 66);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 6);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 66);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 66);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 1);
+ sendValidMessage(3);
+
+ Awaitility
+ .await("All other messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(16));
+ }
+
+ @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed")
+ @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
+ @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" })
+ void testThreeMessagesCauseRetryableDomainErrors(
+ int numFailuresForMessageA,
+ int numFailuresForMessageB,
+ int numFailuresForMessageC)
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetryableExceptionInDomain(3,numFailuresForMessageA);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageB);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageC);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(20))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(30));
+ }
+
+ @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried, but one of them fails too often and is skipped, still all other messages are eventually consumed")
+ @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
+ @CsvSource({ "66,3,4", "4,66,2", "1,2,66" })
+ void testThreeMessagesCauseRetryableDomainErrorsAndOneFailsTooOften(
+ int numFailuresForMessageA,
+ int numFailuresForMessageB,
+ int numFailuresForMessageC)
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetryableExceptionInDomain(3,numFailuresForMessageA);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageB);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageC);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All other messages are eventually consumed")
+ .atMost(Duration.ofSeconds(20))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(29));
+ }
+
+ @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried 3 times with a fixed back-of and all messages are eventually consumed")
+ @ParameterizedTest(name = "Back-Off millis: {0}")
+ @ValueSource(ints = { 100, 250, 500, 1000 })
+ void testOneMessageIsRetriedWithFixedBackOff(int backOffMillis)
+ {
+ BackOff backOff = new FixedBackOff(backOffMillis, 3);
+ createExampleConsumer(backOff);
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
+
+ static final String ID = "TEST";
+ static final String TOPIC = "ExampleConsumerTest_TEST";
+ static final int NUM_PARTITIONS = 10;
+ static final int NUM_RETRIES = 6;
+ static final int POLL_REQUEST_TIMEOUT_MS = 50;
+ static final int MAX_POLL_INTERVALL_MS = 500;
+ static final int MAX_TIME_PER_RECORD_MS = 100;
+ static final int FETCH_MAX_WAIT_MS = 50;
+ static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100;
+
+ @Autowired
+ KafkaTemplate<String, byte[]> kafkaTemplate;
+ @Autowired ApplicationProperties applicationProperties;
+ @Autowired KafkaProperties kafkaProperties;
+ @Autowired Clock clock;
+
+ final Serializer<V> serializer = createSerializer();
+ final long[] currentOffsets = new long[NUM_PARTITIONS];
+
+ long nextMessage = 1;
+
+ final AbstractMockRecordHandler mockRecordHandler = createMockRecordHandler();
+ final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean();
+
+ ExampleConsumer exampleConsumer;
+
+
+ abstract Serializer<V> createSerializer();
+ abstract AbstractMockRecordHandler<V> createMockRecordHandler();
+ abstract Consumer<?, ?> createConsumer(KafkaProperties properties);
+ abstract V createValidMessage();
+ abstract V createMessageThatTriggersRuntimeException();
+ abstract V createMessageThatTriggersNonRetryableException();
+ abstract V createMessageThatTriggersRetryableException(int numFailures);
+
+
+ void createExampleConsumer()
+ {
+ createExampleConsumer(new FixedBackOff(0l, applicationProperties.getConsumerProperties().getNumRetries()));
+ }
+
+ void createExampleConsumer(BackOff backOff)
+ {
+ exampleConsumer = new ExampleConsumer(
+ ID,
+ TOPIC,
+ createConsumer(kafkaProperties),
+ mockRecordHandler,
+ clock,
+ applicationProperties.getConsumerProperties().getPollRequestTimeout(),
+ Duration.ofMillis(MAX_POLL_INTERVALL_MS),
+ applicationProperties.getConsumerProperties().getMaxTimePerRecord(),
+ applicationProperties.getConsumerProperties().getMinSlackPerPollInterval(),
+ backOff,
+ () -> isTerminatedExceptionally.set(true));
+ }
+
+ @BeforeEach
+ void resetParameters()
+ {
+ mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(0);
+ mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(0);
+ }
+
+ @AfterEach
+ void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
+ {
+ exampleConsumer.shutdown();
+ adminClient
+ .deleteRecords(recordsToDelete())
+ .all()
+ .get();
+ mockRecordHandler.clear();
+ nextMessage = 1;
+ isTerminatedExceptionally.set(false);
+ }
+
+ private Map<TopicPartition, RecordsToDelete> recordsToDelete()
+ {
+ return IntStream
+ .range(0, NUM_PARTITIONS)
+ .filter(i -> currentOffsets[i] > 0)
+ .mapToObj(i -> Integer.valueOf(i))
+ .collect(Collectors.toMap(
+ i -> new TopicPartition(TOPIC, i),
+ i -> recordsToDelete(i)));
+ }
+
+ private RecordsToDelete recordsToDelete(int partition)
+ {
+ return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
+ }
+
+ private void sendValidMessage(int partition)
+ {
+ send(partition, createValidMessage());
+ }
+
+ private void sendNonDeserializableMessage(int partition)
+ {
+ send(partition, "BOOM!".getBytes());
+ }
+
+ private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition)
+ {
+ send(partition, createMessageThatTriggersRuntimeException());
+ }
+
+ private void sendMessageThatTriggersNonRetryableExceptionInDomain(int partition)
+ {
+ send(partition, serializer.serialize(TOPIC, createMessageThatTriggersNonRetryableException()));
+ }
+
+ private void sendMessageThatTriggersRetryableExceptionInDomain(int partition, int numFailures)
+ {
+ send(partition, serializer.serialize(TOPIC, createMessageThatTriggersRetryableException(numFailures)));
+ }
+
+ private void send(int partition, V message)
+ {
+ send(partition, serializer.serialize(TOPIC, message));
+ }
+
+ private void send(int partition, byte[] bytes)
+ {
+ nextMessage++;
+ kafkaTemplate
+ .send(TOPIC, partition, "EGAL", bytes)
+ .thenAccept(result ->
+ {
+ RecordMetadata metadata = result.getRecordMetadata();
+ currentOffsets[metadata.partition()] = metadata.offset();
+ });
+ }
+
+
+
+ @TestConfiguration
+ static class ConsumerRunnableTestConfig
+ {
+ @Bean
+ AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+ return AdminClient.create(properties);
+ }
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public abstract class AbstractMockRecordHandler<V> implements RecordHandler<String, V>
+{
+ final Map<OffsetInPartition, Integer> retriableErrors = new HashMap<>();
+
+ Duration normalRecordHandlingDelay;
+ Duration exceptionalRecordHandlingDelay;
+
+ private int numMessagesHandled = 0;
+
+ public int getNumMessagesHandled()
+ {
+ return numMessagesHandled;
+ }
+
+ @Override
+ public void handleRecord(
+ String topic,
+ Integer partition,
+ Long offset,
+ String key,
+ V value) throws RetryableErrorException, NonRetryableErrorException
+ {
+ generateError(new OffsetInPartition(offset, partition), value);
+ sleep(normalRecordHandlingDelay);
+ numMessagesHandled++;
+ log.trace("Handled {} messages so far", numMessagesHandled);
+ }
+
+ abstract void generateError(
+ OffsetInPartition offset,
+ V value) throws RetryableErrorException, NonRetryableErrorException;
+
+ void sleep(Duration duration)
+ {
+ try
+ {
+ Thread.sleep(duration);
+ }
+ catch(InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void clear()
+ {
+ retriableErrors.clear();
+ numMessagesHandled = 0;
+ }
+
+
+ record OffsetInPartition(long offset, int partition) {}
+}
--- /dev/null
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
+
+import java.time.Clock;
+import java.time.Instant;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+
+
+@ExtendWith(MockitoExtension.class)
+class BackOffStateTest
+{
+ final static String ID = "TEST";
+ final static long OFFSET = 666;
+ final static Instant NOW = Instant.now();
+ final static long BACK_OFF = 1000l;
+
+
+ @Mock Clock clock;
+ @Mock BackOff backOff;
+ @Mock BackOffExecution backOffExecution;
+
+
+ private BackOffState NotStartedBackOffState()
+ {
+ // GIVEN
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, clock, backOff);
+
+ return backOffState;
+ }
+
+ @Test
+ @DisplayName("A not started BackOffState is not waiting for a retry")
+ void NotStartedBackOffStateIsNotWaitingForRetry()
+ {
+ // GIVEN
+ BackOffState backOffState = NotStartedBackOffState();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A not started BackOffState is not started")
+ void NotStartedBackOffStateIsNotStarted()
+ {
+ // GIVEN
+ BackOffState backOffState = NotStartedBackOffState();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isFalse();
+ }
+
+
+ private BackOffState StartedBackoffStateWithNoRetries()
+ {
+ // GIVEN
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, clock, backOff);
+ backOffState.start(OFFSET);
+
+ return backOffState;
+ }
+
+ @Test
+ @DisplayName("A started BackOffState with no retries is not waiting for a retry")
+ void StartedBackOffStateWithNoRetriesIsNotWaitingForRetry()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithNoRetries();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState with no retries is started")
+ void StartedBackOffStateWithNoRetriesIsStarted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithNoRetries();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState with no retries is completed")
+ void StartedBackOffStateWithNoRetriesIsCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithNoRetries();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isCompleted(OFFSET)).isTrue();
+ }
+
+
+ private BackOffState StartedBackoffStateWithRetries()
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, clock, backOff);
+ backOffState.start(OFFSET);
+
+ return backOffState;
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is waiting for a retry if the time is not due")
+ void StartedBackOffStateIsWaitingForRetryIfTimeIsNotDue()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF));
+
+ // WHEN
+ boolean result = backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(result).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is started if the time is not due")
+ void StartedBackOffStateIsStarted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF));
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not completed if the time is not due")
+ void StartedBackOffStateIsNotCompletedIfTimeIsNotDue()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF));
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isCompleted(OFFSET)).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not waiting for a retry if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueButRetryNotCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+ // WHEN
+ boolean result = backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is started if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsStartedIfTheTimeIsDueButRetryNotCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not completed if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsNotCompletedIfTheTimeIsDueButRetryNotCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isCompleted(OFFSET)).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not waiting for a retry if the time is due and the retry is completed")
+ void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueAndRetryIsCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ boolean result = backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is started if the time is due and the retry is completed")
+ void StartedBackOffStateIsStartedIfTheTimeIsDueAndRetryIsCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is completed if the time is due and the retry is completed")
+ void StartedBackOffStateIsCompletedIfTheTimeIsDueAndRetryIsCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isCompleted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not waiting for a retry after a reset")
+ void StartedBackOffStateIsNotWaitingForRetryAfterReset()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+
+ // WHEN
+ backOffState.reset();
+
+ // THEN
+ assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not started after a reset")
+ void StartedBackOffStateIsNotStartedAfterReset()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+
+ // WHEN
+ backOffState.reset();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isFalse();
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+
+import java.util.Map;
+
+
+public class LongExampleConsumerTest extends AbstractExampleConsumerTest<Long>
+{
+ @Override
+ AbstractMockRecordHandler<Long> createMockRecordHandler()
+ {
+ return new LongMockRecordHandler();
+ }
+
+ @Override
+ Serializer<Long> createSerializer()
+ {
+ return new LongSerializer();
+ }
+
+ @Override
+ Consumer<?, ?> createConsumer(KafkaProperties kafkaProperties)
+ {
+ Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
+ return new DefaultKafkaConsumerFactory<>(properties).createConsumer();
+ }
+
+ @Override
+ Long createValidMessage()
+ {
+ return nextMessage;
+ }
+
+ @Override
+ Long createMessageThatTriggersRuntimeException()
+ {
+ return VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
+ }
+
+ @Override
+ Long createMessageThatTriggersNonRetryableException()
+ {
+ return VALUE_THAT_TRIGGERS_NONRETRYABLEEXCEPTION;
+ }
+
+ @Override
+ Long createMessageThatTriggersRetryableException(int numFailures)
+ {
+ return VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION * numFailures;
+ }
+
+
+ public final static long VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+ public final static long VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION = -2;
+ public final static long VALUE_THAT_TRIGGERS_NONRETRYABLEEXCEPTION = -3;
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+
+import static de.juplo.kafka.LongExampleConsumerTest.*;
+
+
+@Slf4j
+public class LongMockRecordHandler extends AbstractMockRecordHandler<Long>
+{
+ @Override
+ void generateError(
+ OffsetInPartition offset,
+ Long value) throws RetryableErrorException, NonRetryableErrorException
+ {
+ if (value == null || value > -1)
+ {
+ // Valid message...
+ return;
+ }
+
+ if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION)
+ {
+ throw new RuntimeException("Unexpected application error!");
+ }
+
+ if (value == VALUE_THAT_TRIGGERS_NONRETRYABLEEXCEPTION)
+ {
+ throw new NonRetryableErrorException("Non-Retryable application error!");
+ }
+
+ if ((float)value % (float) VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION == 0f)
+ {
+ int totalOccurrences = (int) (value / VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION);
+ int occurrence = retriableErrors.compute(
+ offset,
+ (k, v) ->
+ {
+ if (v == null)
+ {
+ v = totalOccurrences;
+ }
+ else
+ {
+ v--;
+ }
+
+ return v;
+ });
+
+ if (occurrence <= 0)
+ {
+ retriableErrors.remove(offset);
+ }
+ else
+ {
+ log.debug(
+ "Simulating occurrence #{} of {} for a retryable error at offset {} in partition {}",
+ totalOccurrences - occurrence + 1,
+ totalOccurrences,
+ offset.offset(),
+ offset.partition());
+ sleep(exceptionalRecordHandlingDelay);
+ throw new RetryableErrorException("Retryable application error! Occurrence #" + (totalOccurrences - occurrence + 1));
+ }
+
+ log.info("Simulating a resolved retryable error after {} occurrences of the error", totalOccurrences);
+ }
+ else
+ {
+ log.warn("Not specifically mapped error: {}", value);
+ }
+ }
+}