import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.FixedBackOff;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.util.Properties;
@Configuration
@Slf4j
public class ApplicationConfiguration
{
+ public final static String MAX_POLL_INTERVALL_CONFIG_KEY = "max.poll.interval.ms";
+ public final static Duration MAX_POLL_INTERVALL_DEFAULT_VALUE = Duration.ofMinutes(5);
@Bean
public ExampleConsumer<String, Long> exampleConsumer(
Consumer<String, Long> kafkaConsumer,
RecordHandler<String, Long> recordHandler,
ApplicationProperties properties,
KafkaProperties kafkaProperties,
+ Clock clock,
+ BackOff backOffStrategy,
ConfigurableApplicationContext applicationContext)
{
+ String maxPollIntervalMs = kafkaProperties
+ .getConsumer()
+ .getProperties()
+ .get(MAX_POLL_INTERVALL_CONFIG_KEY);
+ Duration maxPollInterval = maxPollIntervalMs == null
+ ? MAX_POLL_INTERVALL_DEFAULT_VALUE
+ : Duration.ofMillis(Integer.valueOf(maxPollIntervalMs));
+
return
new ExampleConsumer<>(
kafkaProperties.getClientId(),
properties.getConsumerProperties().getTopic(),
kafkaConsumer,
recordHandler,
+ clock,
+ properties.getConsumerProperties().getPollRequestTimeout(),
+ maxPollInterval,
+ properties.getConsumerProperties().getMaxTimePerRecord(),
+ properties.getConsumerProperties().getMinSlackPerPollInterval(),
+ backOffStrategy,
() -> applicationContext.close());
}
return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value);
}
+ @Bean
+ public BackOff backOffStrategy(ApplicationProperties properties)
+ {
+ return new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries());
+ }
+
+ @Bean
+ public Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+
@Bean(destroyMethod = "")
public Consumer<?, ?> kafkaConsumer(ConsumerFactory<?, ?> consumerFactory)
{
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
+import java.time.Duration;
+
@ConfigurationProperties(prefix = "juplo")
@Validated
@NotNull
@NotEmpty
private String topic;
+ @NotNull
+ private Duration pollRequestTimeout;
+ @NotNull
+ private Duration maxTimePerRecord;
+ @NotNull
+ private Duration minSlackPerPollInterval;
+ @NotNull
+ private int numRetries;
}
}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+
+
+@RequiredArgsConstructor
+@Slf4j
+class BackOffState
+{
+ private final String logPrefix;
+ private final Clock clock;
+ private final BackOff backOffStrategy;
+
+ @Getter
+ private long offset;
+ private BackOffExecution backOffExecution;
+ private int numRetries = 0;
+ private Instant timeNextRetryIsDue;
+
+
+ void start(long offset)
+ {
+ this.offset = offset;
+ log.info("{} - Back-Off requested for offset={}", logPrefix, offset);
+ backOffExecution = backOffStrategy.start();
+ initializeNextBackOff();
+ }
+
+ boolean isWaitingForNextRetry()
+ {
+ if (timeNextRetryIsDue == null)
+ {
+ return false;
+ }
+
+ Instant now = clock.instant();
+ Duration remaining = Duration.between(now, timeNextRetryIsDue);
+ if (remaining.isNegative())
+ {
+ log.info(
+ "{} - {}. retry for offset={}, lateness: {}",
+ logPrefix,
+ numRetries,
+ offset,
+ remaining.abs());
+ initializeNextBackOff();
+ return false;
+ }
+ else
+ {
+ log.info(
+ "{} - Next retry for offset={} is due in {}",
+ logPrefix,
+ offset,
+ remaining);
+ return true;
+ }
+ }
+
+ boolean isStarted(long offset)
+ {
+ return this.offset == offset && backOffExecution != null;
+ }
+
+ boolean isCompleted(long offset)
+ {
+ return this.offset == offset && timeNextRetryIsDue == null;
+ }
+
+ void reset()
+ {
+ timeNextRetryIsDue = null;
+ offset = -1l;
+ }
+
+ private void initializeNextBackOff()
+ {
+ long backOffMillis = backOffExecution.nextBackOff();
+
+ if (backOffMillis == BackOffExecution.STOP)
+ {
+ timeNextRetryIsDue = null;
+ }
+ else
+ {
+ numRetries++;
+ timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
+ }
+ }
+}
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.util.backoff.BackOff;
+import java.time.Clock;
import java.time.Duration;
+import java.time.Instant;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
@Slf4j
-public class ExampleConsumer<K, V> implements Runnable
+public class ExampleConsumer<K, V> implements ConsumerRebalanceListener, Runnable
{
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
private final RecordHandler<K, V> recordHandler;
private final Thread workerThread;
+ private final Clock clock;
+ private final Duration pollRequestTimeout;
+ private final Duration maxPollInterval;
+ private final Duration minTimeForNextRecord;
+ private final BackOff backOffStrategy;
+ private final BackOffState[] backOffState;
private final Runnable closeCallback;
private long consumed = 0;
String topic,
Consumer<K, V> consumer,
RecordHandler<K, V> recordHandler,
+ Clock clock,
+ Duration pollRequestTimeout,
+ Duration maxPollInterval,
+ Duration maxTimePerRecord,
+ Duration minSlackPerPollInterval,
+ BackOff backOffStrategy,
Runnable closeCallback)
{
this.id = clientId;
this.topic = topic;
this.consumer = consumer;
this.recordHandler = recordHandler;
+ this.clock = clock;
+ this.pollRequestTimeout = pollRequestTimeout;
+ this.maxPollInterval = maxPollInterval;
+ this.minTimeForNextRecord = maxTimePerRecord.plus(minSlackPerPollInterval);
+ this.backOffStrategy = backOffStrategy;
+
+ int numPartitions = consumer.partitionsFor(topic).size();
+ log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+ this.backOffState = new BackOffState[numPartitions];
workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
workerThread.start();
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic));
+ consumer.subscribe(Arrays.asList(topic), this);
while (true)
{
try
{
- ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1));
+ ConsumerRecords<K, V> records = consumer.poll(pollRequestTimeout);
log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<K, V> record : records)
+
+ Instant deadline = clock.instant().plus(maxPollInterval);
+ boolean abortCurrentPoll = false;
+
+ for (TopicPartition topicPartition : records.partitions())
{
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
+ if (backOffState[topicPartition.partition()].isWaitingForNextRetry())
+ {
+ log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition);
+ consumer.seek(topicPartition, backOffState[topicPartition.partition()].getOffset());
+ continue;
+ }
+
+ List<ConsumerRecord<K, V>> recordsForPartition = records.records(topicPartition);
+ log.debug(
+ "{} - Received {} messages for partition {}",
+ id,
+ recordsForPartition.size(),
+ topicPartition);
+
+ for (ConsumerRecord<K, V> record : recordsForPartition)
+ {
+ if (abortCurrentPoll)
+ {
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+
+ Instant now = clock.instant();
+ Duration timeLeft = Duration.between(now, deadline);
+ log.trace("{} - Time left for current poll: {}", id, timeLeft);
+
+ if (timeLeft.minus(minTimeForNextRecord).isNegative())
+ {
+ log.info(
+ "{} - Aborting record handling, because only {} are left until the poll-interval expires!",
+ id,
+ timeLeft);
+ abortCurrentPoll = true;
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+
+ try
+ {
+ handleRecord(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.key(),
+ record.value());
+ }
+ catch (RetryableErrorException e)
+ {
+ // Seeking to the offset of the record, that raised the exception, and
+ // leaving the loop afterwards, retries the record
+ int partition = topicPartition.partition();
+ if (!backOffState[partition].isStarted(record.offset()))
+ {
+ log.info("{} - First occurrence of a retryable error: {}", id, e.toString());
+ backOffState[partition].start(record.offset());
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+ else
+ {
+ if (backOffState[partition].isCompleted(record.offset()))
+ {
+ log.warn("{} - Ignoring retryable error: {}", id, e.toString());
+ }
+ else
+ {
+ log.info(
+ "{} - Retry in progress for offset={} in {}, error: {}",
+ id,
+ record.offset(),
+ partition,
+ e.toString());
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+ }
+ }
+ catch (NonRetryableErrorException e)
+ {
+ // Just ignore, to skip
+ log.warn("{} - Ignoring non-retryable error: {}", id, e.toString());
+ }
+
+ backOffState[topicPartition.partition()].reset();
+ }
}
}
catch (RecordDeserializationException e)
Integer partition,
Long offset,
K key,
- V value)
+ V value) throws RetryableErrorException, NonRetryableErrorException
{
consumed++;
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
}
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(topicPartition ->
+ backOffState[topicPartition.partition()] = new BackOffState(
+ id + " - partition=" + topicPartition.partition(),
+ clock,
+ backOffStrategy));
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp -> backOffState[tp.partition()] = null);
+ }
+
+
public void shutdown() throws InterruptedException
{
log.info("{} - Waking up the consumer", id);
--- /dev/null
+package de.juplo.kafka;
+
+public class NonRetryableErrorException extends Exception
+{
+ public NonRetryableErrorException(String message)
+ {
+ super(message);
+ }
+}
Integer partition,
Long offset,
K key,
- V value);
+ V value) throws RetryableErrorException, NonRetryableErrorException;
}
--- /dev/null
+package de.juplo.kafka;
+
+public class RetryableErrorException extends Exception
+{
+ public RetryableErrorException(String message)
+ {
+ super(message);
+ }
+}
juplo:
consumer:
topic: test
+ poll-request-timeout: 1s
+ max-time-per-record: 30s
+ min-slack-per-poll-interval: 1s
+ num-retries: 10
management:
endpoint:
shutdown:
consumer:
group-id: my-group
value-deserializer: org.apache.kafka.common.serialization.LongDeserializer
+ auto-offset-reset: earliest
+ auto-commit-interval: 5s
+ max-poll-records: 500
+ fetch-max-wait: 500ms
+ properties:
+ max.poll.interval.ms: 300000
logging:
level:
root: INFO
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.FixedBackOff;
+import java.time.Clock;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static de.juplo.kafka.AbstractExampleConsumerTest.NUM_PARTITIONS;
-import static de.juplo.kafka.AbstractExampleConsumerTest.TOPIC;
+import static de.juplo.kafka.AbstractExampleConsumerTest.*;
import static org.assertj.core.api.Assertions.assertThat;
AbstractExampleConsumerTest.ConsumerRunnableTestConfig.class,
},
properties = {
+ "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms",
+ "juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms",
+ "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms",
+ "juplo.consumer.num-retries=" + NUM_RETRIES,
"spring.kafka.consumer.auto-offset-reset=earliest",
+ "spring.kafka.consumer.properties.max.poll.interval.ms=" + MAX_POLL_INTERVALL_MS,
+ "spring.kafka.consumer.fetch-max-wait=" + FETCH_MAX_WAIT_MS + "ms",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"logging.level.de.juplo.kafka=TRACE",
})
@Test
void testOnlyValidMessages()
{
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are consumed")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
+ @DisplayName("Delayed messages are consumed as expected")
+ @ParameterizedTest(name = "delay for message-consumption: {0}")
+ @ValueSource(ints = { 10, 25, 50, 75, 100 })
+ void testOnlyValidMessagesButAllDelayed(int delay)
+ {
+ createExampleConsumer();
+ mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
@Test
void testDeserializationException()
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
@Test
void testUnexpectedDomainError() throws Exception
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
.untilAsserted(() -> assertThat(isTerminatedExceptionally.get()).isTrue());
}
+ @DisplayName("A message, that triggers an non-retryable exception in the domain-logic, is skipped and all other messages are consumed")
+ @Test
+ void testNonRetryableDomainError() throws Exception
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersNonRetryableExceptionInDomain(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All other valid messages are consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
+ }
+
+ @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
+ @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}")
+ @ValueSource(ints = { 1, 2, 3, 4, 5, 6 })
+ void testOneMessageCausesRetryableDomainErrors(int numFailures)
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetryableExceptionInDomain(3,numFailures);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
+ @DisplayName("All messages on a partition are delayed and a message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
+ @ParameterizedTest(name = "Delay for normal messages: {0}ms")
+ @ValueSource(ints = { 10, 20, 50, 100, 150, 200 })
+ void testOneMessageCausesRetryableDomainErrorsWhileAllMessagesAreDelayed(int delay)
+ {
+ createExampleConsumer();
+ mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+ mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(100);
+
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 1);
+ sendValidMessage(3);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
+ @DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed")
+ @Test
+ void testOneMessageCausesRetryableDomainErrors()
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetryableExceptionInDomain(3,66);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All other messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
+ }
+
+ @DisplayName("Several messages are triggering retryable exception in one partition, some of them fail so often, that they are skipped and all successful messages are eventually consumed")
+ @ParameterizedTest(name = "Delay for errors: {0}ms")
+ @ValueSource(ints = { 10, 20, 50, 100, 150, 200 })
+ void testSeveralMessagesCausesRetryableDomainErrorsInOnePartition(int delay)
+ {
+ createExampleConsumer();
+ mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(delay);
+
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 4);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 6);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 1);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 66);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 5);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 66);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 6);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 66);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 66);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 1);
+ sendValidMessage(3);
+
+ Awaitility
+ .await("All other messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(16));
+ }
+
+ @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed")
+ @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
+ @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" })
+ void testThreeMessagesCauseRetryableDomainErrors(
+ int numFailuresForMessageA,
+ int numFailuresForMessageB,
+ int numFailuresForMessageC)
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetryableExceptionInDomain(3,numFailuresForMessageA);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageB);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageC);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(20))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(30));
+ }
+
+ @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried, but one of them fails too often and is skipped, still all other messages are eventually consumed")
+ @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
+ @CsvSource({ "66,3,4", "4,66,2", "1,2,66" })
+ void testThreeMessagesCauseRetryableDomainErrorsAndOneFailsTooOften(
+ int numFailuresForMessageA,
+ int numFailuresForMessageB,
+ int numFailuresForMessageC)
+ {
+ createExampleConsumer();
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetryableExceptionInDomain(3,numFailuresForMessageA);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageB);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageC);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All other messages are eventually consumed")
+ .atMost(Duration.ofSeconds(20))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(29));
+ }
+
+ @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried 3 times with a fixed back-of and all messages are eventually consumed")
+ @ParameterizedTest(name = "Back-Off millis: {0}")
+ @ValueSource(ints = { 100, 250, 500, 1000 })
+ void testOneMessageIsRetriedWithFixedBackOff(int backOffMillis)
+ {
+ BackOff backOff = new FixedBackOff(backOffMillis, 3);
+ createExampleConsumer(backOff);
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetryableExceptionInDomain(3, 3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
static final String ID = "TEST";
static final String TOPIC = "ExampleConsumerTest_TEST";
static final int NUM_PARTITIONS = 10;
+ static final int NUM_RETRIES = 6;
+ static final int POLL_REQUEST_TIMEOUT_MS = 50;
+ static final int MAX_POLL_INTERVALL_MS = 500;
+ static final int MAX_TIME_PER_RECORD_MS = 100;
+ static final int FETCH_MAX_WAIT_MS = 50;
+ static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100;
@Autowired
KafkaTemplate<String, byte[]> kafkaTemplate;
+ @Autowired ApplicationProperties applicationProperties;
+ @Autowired KafkaProperties kafkaProperties;
+ @Autowired Clock clock;
final Serializer<V> serializer = createSerializer();
final long[] currentOffsets = new long[NUM_PARTITIONS];
abstract Consumer<?, ?> createConsumer(KafkaProperties properties);
abstract V createValidMessage();
abstract V createMessageThatTriggersRuntimeException();
+ abstract V createMessageThatTriggersNonRetryableException();
+ abstract V createMessageThatTriggersRetryableException(int numFailures);
- @BeforeEach
- void createExampleConsumer(@Autowired KafkaProperties properties)
+ void createExampleConsumer()
+ {
+ createExampleConsumer(new FixedBackOff(0l, applicationProperties.getConsumerProperties().getNumRetries()));
+ }
+
+ void createExampleConsumer(BackOff backOff)
{
exampleConsumer = new ExampleConsumer(
ID,
TOPIC,
- createConsumer(properties),
+ createConsumer(kafkaProperties),
mockRecordHandler,
+ clock,
+ applicationProperties.getConsumerProperties().getPollRequestTimeout(),
+ Duration.ofMillis(MAX_POLL_INTERVALL_MS),
+ applicationProperties.getConsumerProperties().getMaxTimePerRecord(),
+ applicationProperties.getConsumerProperties().getMinSlackPerPollInterval(),
+ backOff,
() -> isTerminatedExceptionally.set(true));
}
+ @BeforeEach
+ void resetParameters()
+ {
+ mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(0);
+ mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(0);
+ }
+
@AfterEach
void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
{
send(partition, createMessageThatTriggersRuntimeException());
}
+ private void sendMessageThatTriggersNonRetryableExceptionInDomain(int partition)
+ {
+ send(partition, serializer.serialize(TOPIC, createMessageThatTriggersNonRetryableException()));
+ }
+
+ private void sendMessageThatTriggersRetryableExceptionInDomain(int partition, int numFailures)
+ {
+ send(partition, serializer.serialize(TOPIC, createMessageThatTriggersRetryableException(numFailures)));
+ }
+
private void send(int partition, V message)
{
send(partition, serializer.serialize(TOPIC, message));
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
return AdminClient.create(properties);
}
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
}
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
@RequiredArgsConstructor
@Slf4j
public abstract class AbstractMockRecordHandler<V> implements RecordHandler<String, V>
{
+ final Map<OffsetInPartition, Integer> retriableErrors = new HashMap<>();
+
+ Duration normalRecordHandlingDelay;
+ Duration exceptionalRecordHandlingDelay;
+
private int numMessagesHandled = 0;
public int getNumMessagesHandled()
Integer partition,
Long offset,
String key,
- V value)
+ V value) throws RetryableErrorException, NonRetryableErrorException
{
- generateError(value);
+ generateError(new OffsetInPartition(offset, partition), value);
+ sleep(normalRecordHandlingDelay);
numMessagesHandled++;
log.trace("Handled {} messages so far", numMessagesHandled);
}
- abstract void generateError(V value);
+ abstract void generateError(
+ OffsetInPartition offset,
+ V value) throws RetryableErrorException, NonRetryableErrorException;
+
+ void sleep(Duration duration)
+ {
+ try
+ {
+ Thread.sleep(duration);
+ }
+ catch(InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
public void clear()
{
+ retriableErrors.clear();
numMessagesHandled = 0;
}
+
+
+ record OffsetInPartition(long offset, int partition) {}
}
--- /dev/null
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
+
+import java.time.Clock;
+import java.time.Instant;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+
+
+@ExtendWith(MockitoExtension.class)
+class BackOffStateTest
+{
+ final static String ID = "TEST";
+ final static long OFFSET = 666;
+ final static Instant NOW = Instant.now();
+ final static long BACK_OFF = 1000l;
+
+
+ @Mock Clock clock;
+ @Mock BackOff backOff;
+ @Mock BackOffExecution backOffExecution;
+
+
+ private BackOffState NotStartedBackOffState()
+ {
+ // GIVEN
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, clock, backOff);
+
+ return backOffState;
+ }
+
+ @Test
+ @DisplayName("A not started BackOffState is not waiting for a retry")
+ void NotStartedBackOffStateIsNotWaitingForRetry()
+ {
+ // GIVEN
+ BackOffState backOffState = NotStartedBackOffState();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A not started BackOffState is not started")
+ void NotStartedBackOffStateIsNotStarted()
+ {
+ // GIVEN
+ BackOffState backOffState = NotStartedBackOffState();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isFalse();
+ }
+
+
+ private BackOffState StartedBackoffStateWithNoRetries()
+ {
+ // GIVEN
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, clock, backOff);
+ backOffState.start(OFFSET);
+
+ return backOffState;
+ }
+
+ @Test
+ @DisplayName("A started BackOffState with no retries is not waiting for a retry")
+ void StartedBackOffStateWithNoRetriesIsNotWaitingForRetry()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithNoRetries();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState with no retries is started")
+ void StartedBackOffStateWithNoRetriesIsStarted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithNoRetries();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState with no retries is completed")
+ void StartedBackOffStateWithNoRetriesIsCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithNoRetries();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isCompleted(OFFSET)).isTrue();
+ }
+
+
+ private BackOffState StartedBackoffStateWithRetries()
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, clock, backOff);
+ backOffState.start(OFFSET);
+
+ return backOffState;
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is waiting for a retry if the time is not due")
+ void StartedBackOffStateIsWaitingForRetryIfTimeIsNotDue()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF));
+
+ // WHEN
+ boolean result = backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(result).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is started if the time is not due")
+ void StartedBackOffStateIsStarted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF));
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not completed if the time is not due")
+ void StartedBackOffStateIsNotCompletedIfTimeIsNotDue()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF));
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isCompleted(OFFSET)).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not waiting for a retry if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueButRetryNotCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+ // WHEN
+ boolean result = backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is started if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsStartedIfTheTimeIsDueButRetryNotCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not completed if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsNotCompletedIfTheTimeIsDueButRetryNotCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isCompleted(OFFSET)).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not waiting for a retry if the time is due and the retry is completed")
+ void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueAndRetryIsCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ boolean result = backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is started if the time is due and the retry is completed")
+ void StartedBackOffStateIsStartedIfTheTimeIsDueAndRetryIsCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is completed if the time is due and the retry is completed")
+ void StartedBackOffStateIsCompletedIfTheTimeIsDueAndRetryIsCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+ given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isCompleted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not waiting for a retry after a reset")
+ void StartedBackOffStateIsNotWaitingForRetryAfterReset()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+
+ // WHEN
+ backOffState.reset();
+
+ // THEN
+ assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not started after a reset")
+ void StartedBackOffStateIsNotStartedAfterReset()
+ {
+ // GIVEN
+ BackOffState backOffState = StartedBackoffStateWithRetries();
+
+ // WHEN
+ backOffState.reset();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isFalse();
+ }
+}
return VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
}
+ @Override
+ Long createMessageThatTriggersNonRetryableException()
+ {
+ return VALUE_THAT_TRIGGERS_NONRETRYABLEEXCEPTION;
+ }
+
+ @Override
+ Long createMessageThatTriggersRetryableException(int numFailures)
+ {
+ return VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION * numFailures;
+ }
+
public final static long VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+ public final static long VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION = -2;
+ public final static long VALUE_THAT_TRIGGERS_NONRETRYABLEEXCEPTION = -3;
}
import lombok.extern.slf4j.Slf4j;
-import static de.juplo.kafka.LongExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
+import static de.juplo.kafka.LongExampleConsumerTest.*;
@Slf4j
public class LongMockRecordHandler extends AbstractMockRecordHandler<Long>
{
- void generateError(Long value)
+ @Override
+ void generateError(
+ OffsetInPartition offset,
+ Long value) throws RetryableErrorException, NonRetryableErrorException
{
+ if (value == null || value > -1)
+ {
+ // Valid message...
+ return;
+ }
+
if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION)
{
throw new RuntimeException("Unexpected application error!");
}
- log.info("Not specifically mapped error: {}", value);
+ if (value == VALUE_THAT_TRIGGERS_NONRETRYABLEEXCEPTION)
+ {
+ throw new NonRetryableErrorException("Non-Retryable application error!");
+ }
+
+ if ((float)value % (float) VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION == 0f)
+ {
+ int totalOccurrences = (int) (value / VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION);
+ int occurrence = retriableErrors.compute(
+ offset,
+ (k, v) ->
+ {
+ if (v == null)
+ {
+ v = totalOccurrences;
+ }
+ else
+ {
+ v--;
+ }
+
+ return v;
+ });
+
+ if (occurrence <= 0)
+ {
+ retriableErrors.remove(offset);
+ }
+ else
+ {
+ log.debug(
+ "Simulating occurrence #{} of {} for a retryable error at offset {} in partition {}",
+ totalOccurrences - occurrence + 1,
+ totalOccurrences,
+ offset.offset(),
+ offset.partition());
+ sleep(exceptionalRecordHandlingDelay);
+ throw new RetryableErrorException("Retryable application error! Occurrence #" + (totalOccurrences - occurrence + 1));
+ }
+
+ log.info("Simulating a resolved retryable error after {} occurrences of the error", totalOccurrences);
+ }
+ else
+ {
+ log.warn("Not specifically mapped error: {}", value);
+ }
}
}