Version des `spring-consumer` mit einer vollständigen Fehlerbehandlung consumer/spring-consumer--error-handling--generics4all consumer/spring-consumer--error-handling--generics4some
authorKai Moritz <kai@juplo.de>
Sat, 11 Jan 2025 16:10:02 +0000 (17:10 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 21 Feb 2025 15:14:13 +0000 (16:14 +0100)
* Exceptions & Konfig für wiederholbare und nicht wiederholbare Fehler
* RED: Erwartetes Verhalten für die Fehlerbehandlung definiert
* GREEN: Erweitertes Error-Handling implementiert
* Timout für den Poll-Request konfigurierbar gemacht
* Timings für den `ExampleConsumerTest` enger gezogen
* Keine Verzögerung für Fehler in den Retry-Tests
* Zusätzliche Logging-Meldung für Retry-Ablauf
* Nicht ausgewertete Unterscheidung entfernt
* Erzeugung des `ExampleConsumer` im Tests über Methode konfigurierbar
* GREEN: Erwartetes Verhalten für unterschiedliche Delays definiert
* Slack für Poll-Intervall ist explizit konfigurierbar
* Test für Retries mit fixem Back-Off in `ExampleConsumerTest` ergänzt
* Der Offset eines inaktiven `BackOffState` sollte ein ungültiger Wert sein
* Innere Klasse `BackOffState` statisch gemacht
* Innere Klasse `BackOffState` extrahiert
* Logging in `BackOffState` verbessert
* Eindeutigere Methodennamen in `BackOffState`
* RED - Unit-Test für `BackOffState` implementiert
* GREEN - Fehler in der Initialisierung von `BackOffState` korrigiert
* Doppelten Code in `BackOffStateTest` in Methoden ausgelagert
* Mocking mit `@Mock` auf Klassenebene erspart Parameter-Schlacht
* Aussagelose Tests aus `BackOffStateTest` entfernt
* BackOff-Zeit in `BackOffStateTest` in statische Variable ausgelagert
* Logging-Meldung des `BackOffState` vereinfacht und verbessert
* RED: Korrigiertes Verhalten für `BackOffState` definiert
* GREEN: Implementierung von `BackOffState` korrigiert
* Umstellung des `ExampleConsumerTest` auf AssertJ
* `fetch.max.wait` konfigurierbar gemacht
* Test für viele Fehler in einer Partition in `BackOffStateTest` ergänzt
* Überprüfung des ``BackOffState``-Zustands berücksichtigt immer den Offset
* Bedingungsloser Reset (und besserer Methoden-Name) für `BackOffState`
* `BackOffState` wird nur 1x erzeugt und danach zurückgesetzt und gestartet
* Überflüssiges Attribut in `BackOffState` entfernt

15 files changed:
README.sh
build.gradle
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/BackOffState.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/java/de/juplo/kafka/NonRetriableErrorException.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/RecordHandler.java
src/main/java/de/juplo/kafka/RetriableErrorException.java [new file with mode: 0644]
src/main/resources/application.yml
src/test/java/de/juplo/kafka/BackOffStateTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/ExampleConsumerTest.java
src/test/java/de/juplo/kafka/MockRecordHandler.java

index 392b237..fcf2322 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-error-handling-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
index 9df5cda..3df5a51 100644 (file)
@@ -8,7 +8,7 @@ plugins {
 }
 
 group = 'de.juplo.kafka'
-version = '1.1-deserialization-error-SNAPSHOT'
+version = '1.1-error-handling-SNAPSHOT'
 
 java {
        toolchain {
index 56003f7..e23f799 100644 (file)
@@ -150,14 +150,14 @@ services:
       juplo.consumer.topic: test
 
   peter:
-    image: juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT
+    image: juplo/spring-consumer:1.1-error-handling-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: peter
       juplo.consumer.topic: test
 
   ute:
-    image: juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT
+    image: juplo/spring-consumer:1.1-error-handling-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: ute
diff --git a/pom.xml b/pom.xml
index b5cb106..6bf1dfe 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
   <artifactId>spring-consumer</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-deserialization-error-SNAPSHOT</version>
+  <version>1.1-error-handling-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
index c417484..eeef2a6 100644 (file)
@@ -10,7 +10,10 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.FixedBackOff;
 
+import java.time.Clock;
 import java.util.Properties;
 
 
@@ -24,6 +27,8 @@ public class ApplicationConfiguration
     Consumer<String, Long> kafkaConsumer,
     RecordHandler<String, Long> recordHandler,
     ApplicationProperties properties,
+    Clock clock,
+    BackOff backOffStrategy,
     ConfigurableApplicationContext applicationContext)
   {
     return
@@ -32,6 +37,12 @@ public class ApplicationConfiguration
         properties.getConsumerProperties().getTopic(),
         kafkaConsumer,
         recordHandler,
+        clock,
+        properties.getConsumerProperties().getPollRequestTimeout(),
+        properties.getConsumerProperties().getMaxPollInterval(),
+        properties.getConsumerProperties().getMaxTimePerRecord(),
+        properties.getConsumerProperties().getMinSlackPerPollInterval(),
+        backOffStrategy,
         () -> applicationContext.close());
   }
 
@@ -41,6 +52,18 @@ public class ApplicationConfiguration
     return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value);
   }
 
+  @Bean
+  public BackOff backOffStrategy(ApplicationProperties properties)
+  {
+    return new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries());
+  }
+
+  @Bean
+  public Clock clock()
+  {
+    return Clock.systemDefaultZone();
+  }
+
   @Bean(destroyMethod = "")
   public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
   {
@@ -52,11 +75,11 @@ public class ApplicationConfiguration
     {
       props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name());
     }
-    if (properties.getConsumerProperties().getAutoCommitInterval() != null)
-    {
-      props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
-    }
+    props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
     props.put("metadata.maxage.ms", 5000); //  5 Sekunden
+    props.put("max.poll.interval.ms", (int) properties.getConsumer().getMaxPollInterval().toMillis());
+    props.put("max.poll.interval.records", properties.getConsumer().getMaxPollRecords());
+    props.put("fetch.max.wait.ms", (int)properties.getConsumer().getFetchMaxWait().toMillis());
     props.put("partition.assignment.strategy", StickyAssignor.class.getName());
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", LongDeserializer.class.getName());
index c8193c9..b84c2d2 100644 (file)
@@ -45,7 +45,22 @@ public class ApplicationProperties
     @NotEmpty
     private String topic;
     private OffsetReset autoOffsetReset;
+    @NotNull
     private Duration autoCommitInterval;
+    @NotNull
+    private Duration pollRequestTimeout;
+    @NotNull
+    private Duration maxPollInterval;
+    @NotNull
+    private int maxPollRecords;
+    @NotNull
+    private Duration fetchMaxWait;
+    @NotNull
+    private Duration maxTimePerRecord;
+    @NotNull
+    private Duration minSlackPerPollInterval;
+    @NotNull
+    private int numRetries;
 
     enum OffsetReset { latest, earliest, none }
   }
diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java
new file mode 100644 (file)
index 0000000..8c6785e
--- /dev/null
@@ -0,0 +1,98 @@
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+
+
+@RequiredArgsConstructor
+@Slf4j
+class BackOffState
+{
+  private final String logPrefix;
+  private final Clock clock;
+  private final BackOff backOffStrategy;
+
+  @Getter
+  private long offset;
+  private BackOffExecution backOffExecution;
+  private int numRetries = 0;
+  private Instant timeNextRetryIsDue;
+
+
+  void start(long offset)
+  {
+    this.offset = offset;
+    log.info("{} - Back-Off requested for offset={}", logPrefix, offset);
+    backOffExecution = backOffStrategy.start();
+    initializeNextBackOff();
+  }
+
+  boolean isWaitingForNextRetry()
+  {
+    if (timeNextRetryIsDue == null)
+    {
+      return false;
+    }
+
+    Instant now = clock.instant();
+    Duration remaining = Duration.between(now, timeNextRetryIsDue);
+    if (remaining.isNegative())
+    {
+      log.info(
+        "{} - {}. retry for offset={}, lateness: {}",
+        logPrefix,
+        numRetries,
+        offset,
+        remaining.abs());
+      initializeNextBackOff();
+      return false;
+    }
+    else
+    {
+      log.info(
+        "{} - Next retry for offset={} is due in {}",
+        logPrefix,
+        offset,
+        remaining);
+      return true;
+    }
+  }
+
+  boolean isStarted(long offset)
+  {
+    return this.offset == offset && backOffExecution != null;
+  }
+
+  boolean isCompleted(long offset)
+  {
+    return this.offset == offset && timeNextRetryIsDue == null;
+  }
+
+  void reset()
+  {
+    timeNextRetryIsDue = null;
+    offset = -1l;
+  }
+
+  private void initializeNextBackOff()
+  {
+    long backOffMillis = backOffExecution.nextBackOff();
+
+    if (backOffMillis == BackOffExecution.STOP)
+    {
+      timeNextRetryIsDue = null;
+    }
+    else
+    {
+      numRetries++;
+      timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
+    }
+  }
+}
index b2c59f8..821d0e9 100644 (file)
@@ -2,23 +2,36 @@ package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.util.backoff.BackOff;
 
+import java.time.Clock;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 
 
 @Slf4j
-public class ExampleConsumer<K, V> implements Runnable
+public class ExampleConsumer<K, V> implements ConsumerRebalanceListener, Runnable
 {
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
   private final RecordHandler<K, V> recordHandler;
   private final Thread workerThread;
+  private final Clock clock;
+  private final Duration pollRequestTimeout;
+  private final Duration maxPollInterval;
+  private final Duration minTimeForNextRecord;
+  private final BackOff backOffStrategy;
+  private final BackOffState[] backOffState;
   private final Runnable closeCallback;
 
   private volatile boolean running = false;
@@ -30,12 +43,27 @@ public class ExampleConsumer<K, V> implements Runnable
     String topic,
     Consumer<K, V> consumer,
     RecordHandler<K, V> recordHandler,
+    Clock clock,
+    Duration pollRequestTimeout,
+    Duration maxPollInterval,
+    Duration maxTimePerRecord,
+    Duration minSlackPerPollInterval,
+    BackOff backOffStrategy,
     Runnable closeCallback)
   {
     this.id = clientId;
     this.topic = topic;
     this.consumer = consumer;
     this.recordHandler = recordHandler;
+    this.clock = clock;
+    this.pollRequestTimeout = pollRequestTimeout;
+    this.maxPollInterval = maxPollInterval;
+    this.minTimeForNextRecord = maxTimePerRecord.plus(minSlackPerPollInterval);
+    this.backOffStrategy = backOffStrategy;
+
+    int numPartitions = consumer.partitionsFor(topic).size();
+    log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+    this.backOffState = new BackOffState[numPartitions];
 
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
     workerThread.start();
@@ -50,24 +78,107 @@ public class ExampleConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic));
+      consumer.subscribe(Arrays.asList(topic), this);
       running = true;
 
       while (running)
       {
         try
         {
-          ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1));
+          ConsumerRecords<K, V> records = consumer.poll(pollRequestTimeout);
 
           log.info("{} - Received {} messages", id, records.count());
-          for (ConsumerRecord<K, V> record : records)
+
+          Instant deadline = clock.instant().plus(maxPollInterval);
+          boolean abortCurrentPoll = false;
+
+          for (TopicPartition topicPartition : records.partitions())
           {
-            handleRecord(
-              record.topic(),
-              record.partition(),
-              record.offset(),
-              record.key(),
-              record.value());
+            if (backOffState[topicPartition.partition()].isWaitingForNextRetry())
+            {
+              log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition);
+              consumer.seek(topicPartition, backOffState[topicPartition.partition()].getOffset());
+              continue;
+            }
+
+            List<ConsumerRecord<K, V>> recordsForPartition = records.records(topicPartition);
+            log.debug(
+              "{} - Received {} messages for partition {}",
+              id,
+              recordsForPartition.size(),
+              topicPartition);
+
+            for (ConsumerRecord<K, V> record : recordsForPartition)
+            {
+              if (abortCurrentPoll)
+              {
+                consumer.seek(topicPartition, record.offset());
+                break;
+              }
+
+              Instant now = clock.instant();
+              Duration timeLeft = Duration.between(now, deadline);
+              log.trace("{} - Time left for current poll: {}", id, timeLeft);
+
+              if (timeLeft.minus(minTimeForNextRecord).isNegative())
+              {
+                log.info(
+                  "{} - Aborting record handling, because only {} are left until the poll-interval expires!",
+                  id,
+                  timeLeft);
+                abortCurrentPoll = true;
+                consumer.seek(topicPartition, record.offset());
+                break;
+              }
+
+              try
+              {
+                handleRecord(
+                  record.topic(),
+                  record.partition(),
+                  record.offset(),
+                  record.key(),
+                  record.value());
+              }
+              catch (RetriableErrorException e)
+              {
+                // Seeking to the offset of the record, that raised the exception, and
+                // leaving the loop afterwards, retries the record
+                int partition = topicPartition.partition();
+                if (!backOffState[partition].isStarted(record.offset()))
+                {
+                  log.info("{} - First occurrence of a retryable error: {}", id, e.toString());
+                  backOffState[partition].start(record.offset());
+                  consumer.seek(topicPartition, record.offset());
+                  break;
+                }
+                else
+                {
+                  if (backOffState[partition].isCompleted(record.offset()))
+                  {
+                    log.warn("{} - Ignoring retryable error: {}", id, e.toString());
+                  }
+                  else
+                  {
+                    log.info(
+                      "{} - Retry in progress for offset={} in {}, error: {}",
+                      id,
+                      record.offset(),
+                      partition,
+                      e.toString());
+                    consumer.seek(topicPartition, record.offset());
+                    break;
+                  }
+                }
+              }
+              catch (NonRetriableErrorException e)
+              {
+                // Just ignore, to skip
+                log.warn("{} - Ignoring non-retryable error: {}", id, e.toString());
+              }
+
+              backOffState[topicPartition.partition()].reset();
+            }
           }
         }
         catch (RecordDeserializationException e)
@@ -106,7 +217,7 @@ public class ExampleConsumer<K, V> implements Runnable
     Integer partition,
     Long offset,
     K key,
-    V value)
+    V value) throws RetriableErrorException, NonRetriableErrorException
   {
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
@@ -114,6 +225,23 @@ public class ExampleConsumer<K, V> implements Runnable
   }
 
 
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(topicPartition ->
+      backOffState[topicPartition.partition()] = new BackOffState(
+        id + " - partition=" + topicPartition.partition(),
+        clock,
+        backOffStrategy));
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp -> backOffState[tp.partition()] = null);
+  }
+
+
   public void shutdown() throws InterruptedException
   {
     log.info("{} joining the worker-thread...", id);
diff --git a/src/main/java/de/juplo/kafka/NonRetriableErrorException.java b/src/main/java/de/juplo/kafka/NonRetriableErrorException.java
new file mode 100644 (file)
index 0000000..0eb0ff2
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka;
+
+public class NonRetriableErrorException extends Exception
+{
+  public NonRetriableErrorException(String message)
+  {
+    super(message);
+  }
+}
index a7b65af..5edcf58 100644 (file)
@@ -7,5 +7,5 @@ public interface RecordHandler<K, V>
     Integer partition,
     Long offset,
     K key,
-    V value);
+    V value) throws RetriableErrorException, NonRetriableErrorException;
 }
diff --git a/src/main/java/de/juplo/kafka/RetriableErrorException.java b/src/main/java/de/juplo/kafka/RetriableErrorException.java
new file mode 100644 (file)
index 0000000..598ddb0
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka;
+
+public class RetriableErrorException extends Exception
+{
+  public RetriableErrorException(String message)
+  {
+    super(message);
+  }
+}
index 7a06731..6528b28 100644 (file)
@@ -6,6 +6,13 @@ juplo:
     topic: test
     auto-offset-reset: earliest
     auto-commit-interval: 5s
+    poll-request-timeout: 1s
+    max-poll-interval: 5m
+    max-poll-records: 500
+    fetch-max-wait: 500ms
+    max-time-per-record: 30s
+    min-slack-per-poll-interval: 1s
+    num-retries: 10
 management:
   endpoint:
     shutdown:
diff --git a/src/test/java/de/juplo/kafka/BackOffStateTest.java b/src/test/java/de/juplo/kafka/BackOffStateTest.java
new file mode 100644 (file)
index 0000000..f58f545
--- /dev/null
@@ -0,0 +1,304 @@
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
+
+import java.time.Clock;
+import java.time.Instant;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+
+
+@ExtendWith(MockitoExtension.class)
+class BackOffStateTest
+{
+  final static String ID = "TEST";
+  final static long OFFSET = 666;
+  final static Instant NOW = Instant.now();
+  final static long BACK_OFF = 1000l;
+
+
+  @Mock Clock clock;
+  @Mock BackOff backOff;
+  @Mock BackOffExecution backOffExecution;
+
+
+  private BackOffState NotStartedBackOffState()
+  {
+    // GIVEN
+
+    // WHEN
+    BackOffState backOffState = new BackOffState(ID, clock, backOff);
+
+    return backOffState;
+  }
+
+  @Test
+  @DisplayName("A not started BackOffState is not waiting for a retry")
+  void NotStartedBackOffStateIsNotWaitingForRetry()
+  {
+    // GIVEN
+    BackOffState backOffState = NotStartedBackOffState();
+
+    // WHEN
+
+    // THEN
+    assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+  }
+
+  @Test
+  @DisplayName("A not started BackOffState is not started")
+  void NotStartedBackOffStateIsNotStarted()
+  {
+    // GIVEN
+    BackOffState backOffState = NotStartedBackOffState();
+
+    // WHEN
+
+    // THEN
+    assertThat(backOffState.isStarted(OFFSET)).isFalse();
+  }
+
+
+  private BackOffState StartedBackoffStateWithNoRetries()
+  {
+    // GIVEN
+    given(backOff.start()).willReturn(backOffExecution);
+    given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+    // WHEN
+    BackOffState backOffState = new BackOffState(ID, clock, backOff);
+    backOffState.start(OFFSET);
+
+    return backOffState;
+  }
+
+  @Test
+  @DisplayName("A started BackOffState with no retries is not waiting for a retry")
+  void StartedBackOffStateWithNoRetriesIsNotWaitingForRetry()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithNoRetries();
+
+    // WHEN
+
+    // THEN
+    assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+  }
+
+  @Test
+  @DisplayName("A started BackOffState with no retries is started")
+  void StartedBackOffStateWithNoRetriesIsStarted()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithNoRetries();
+
+    // WHEN
+
+    // THEN
+    assertThat(backOffState.isStarted(OFFSET)).isTrue();
+  }
+
+  @Test
+  @DisplayName("A started BackOffState with no retries is completed")
+  void StartedBackOffStateWithNoRetriesIsCompleted()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithNoRetries();
+
+    // WHEN
+
+    // THEN
+    assertThat(backOffState.isCompleted(OFFSET)).isTrue();
+  }
+
+
+  private BackOffState StartedBackoffStateWithRetries()
+  {
+    // GIVEN
+    given(clock.instant()).willReturn(NOW);
+    given(backOff.start()).willReturn(backOffExecution);
+    given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+    // WHEN
+    BackOffState backOffState = new BackOffState(ID, clock, backOff);
+    backOffState.start(OFFSET);
+
+    return backOffState;
+  }
+
+  @Test
+  @DisplayName("A started BackOffState is waiting for a retry if the time is not due")
+  void StartedBackOffStateIsWaitingForRetryIfTimeIsNotDue()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithRetries();
+    given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF));
+
+    // WHEN
+    boolean result = backOffState.isWaitingForNextRetry();
+
+    // THEN
+    assertThat(result).isTrue();
+  }
+
+  @Test
+  @DisplayName("A started BackOffState is started if the time is not due")
+  void StartedBackOffStateIsStarted()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithRetries();
+    given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF));
+
+    // WHEN
+    backOffState.isWaitingForNextRetry();
+
+    // THEN
+    assertThat(backOffState.isStarted(OFFSET)).isTrue();
+  }
+
+  @Test
+  @DisplayName("A started BackOffState is not completed if the time is not due")
+  void StartedBackOffStateIsNotCompletedIfTimeIsNotDue()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithRetries();
+    given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF));
+
+    // WHEN
+    backOffState.isWaitingForNextRetry();
+
+    // THEN
+    assertThat(backOffState.isCompleted(OFFSET)).isFalse();
+  }
+
+  @Test
+  @DisplayName("A started BackOffState is not waiting for a retry if the time is due but the retry not yet completed")
+  void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueButRetryNotCompleted()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithRetries();
+    given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+    given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+    // WHEN
+    boolean result = backOffState.isWaitingForNextRetry();
+
+    // THEN
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  @DisplayName("A started BackOffState is started if the time is due but the retry not yet completed")
+  void StartedBackOffStateIsStartedIfTheTimeIsDueButRetryNotCompleted()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithRetries();
+    given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+    given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+    // WHEN
+    backOffState.isWaitingForNextRetry();
+
+    // THEN
+    assertThat(backOffState.isStarted(OFFSET)).isTrue();
+  }
+
+  @Test
+  @DisplayName("A started BackOffState is not completed if the time is due but the retry not yet completed")
+  void StartedBackOffStateIsNotCompletedIfTheTimeIsDueButRetryNotCompleted()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithRetries();
+    given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+    given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
+
+    // WHEN
+    backOffState.isWaitingForNextRetry();
+
+    // THEN
+    assertThat(backOffState.isCompleted(OFFSET)).isFalse();
+  }
+
+  @Test
+  @DisplayName("A started BackOffState is not waiting for a retry if the time is due and the retry is completed")
+  void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueAndRetryIsCompleted()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithRetries();
+    given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+    given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+    // WHEN
+    boolean result = backOffState.isWaitingForNextRetry();
+
+    // THEN
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  @DisplayName("A started BackOffState is started if the time is due and the retry is completed")
+  void StartedBackOffStateIsStartedIfTheTimeIsDueAndRetryIsCompleted()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithRetries();
+    given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+    given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+    // WHEN
+    backOffState.isWaitingForNextRetry();
+
+    // THEN
+    assertThat(backOffState.isStarted(OFFSET)).isTrue();
+  }
+
+  @Test
+  @DisplayName("A started BackOffState is completed if the time is due and the retry is completed")
+  void StartedBackOffStateIsCompletedIfTheTimeIsDueAndRetryIsCompleted()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithRetries();
+    given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1));
+    given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+    // WHEN
+    backOffState.isWaitingForNextRetry();
+
+    // THEN
+    assertThat(backOffState.isCompleted(OFFSET)).isTrue();
+  }
+
+  @Test
+  @DisplayName("A started BackOffState is not waiting for a retry after a reset")
+  void StartedBackOffStateIsNotWaitingForRetryAfterReset()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithRetries();
+
+    // WHEN
+    backOffState.reset();
+
+    // THEN
+    assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+  }
+
+  @Test
+  @DisplayName("A started BackOffState is not started after a reset")
+  void StartedBackOffStateIsNotStartedAfterReset()
+  {
+    // GIVEN
+    BackOffState backOffState = StartedBackoffStateWithRetries();
+
+    // WHEN
+    backOffState.reset();
+
+    // THEN
+    assertThat(backOffState.isStarted(OFFSET)).isFalse();
+  }
+}
index 590c9cd..1f9713f 100644 (file)
@@ -12,6 +12,9 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
@@ -20,7 +23,10 @@ import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.FixedBackOff;
 
+import java.time.Clock;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
@@ -29,8 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS;
-import static de.juplo.kafka.ExampleConsumerTest.TOPIC;
+import static de.juplo.kafka.ExampleConsumerTest.*;
+import static org.assertj.core.api.Assertions.assertThat;
 
 
 @SpringBootTest(
@@ -41,16 +47,24 @@ import static de.juplo.kafka.ExampleConsumerTest.TOPIC;
   },
   properties = {
     "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+    "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms",
+    "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms",
+    "juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms",
+    "juplo.consumer.fetch-max-wait=" + FETCH_MAX_WAIT_MS + "ms",
+    "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms",
+    "juplo.consumer.num-retries=" + NUM_RETRIES,
     "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
     "logging.level.de.juplo.kafka=TRACE",
   })
 @EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
 public class ExampleConsumerTest
 {
-  @DisplayName("All messages are consumed")
+  @DisplayName("All messages are consumed as expected")
   @Test
   void testOnlyValidMessages()
   {
+    createExampleConsumer();
+
     sendValidMessage(0);
     sendValidMessage(1);
     sendValidMessage(2);
@@ -75,13 +89,50 @@ public class ExampleConsumerTest
     Awaitility
       .await("All messages are consumed")
       .atMost(Duration.ofSeconds(5))
-      .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+  }
+
+  @DisplayName("Delayed messages are consumed as expected")
+  @ParameterizedTest(name = "delay for message-consumption: {0}")
+  @ValueSource(ints = { 10, 25, 50, 75, 100 })
+  void testOnlyValidMessagesButAllDelayed(int delay)
+  {
+    createExampleConsumer();
+    mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All messages are consumed")
+      .atMost(Duration.ofSeconds(5))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
   }
 
   @DisplayName("A deserialization exception is skipped and all valid messages are consumed")
   @Test
   void testDeserializationException()
   {
+    createExampleConsumer();
+
     sendValidMessage(0);
     sendValidMessage(1);
     sendValidMessage(2);
@@ -106,13 +157,15 @@ public class ExampleConsumerTest
     Awaitility
       .await("All valid messages are consumed")
       .atMost(Duration.ofSeconds(15))
-      .until(() -> mockRecordHandler.getNumMessagesHandled() == 19);
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
   }
 
   @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application")
   @Test
   void testUnexpectedDomainError() throws Exception
   {
+    createExampleConsumer();
+
     sendValidMessage(0);
     sendValidMessage(1);
     sendValidMessage(2);
@@ -138,16 +191,324 @@ public class ExampleConsumerTest
       .await("The ConsumerRunnable is exited by an unexpected exception")
       .atMost(Duration.ofSeconds(5))
       .pollInterval(Duration.ofMillis(250))
-      .until(() -> isTerminatedExceptionally.get());
+      .untilAsserted(() -> assertThat(isTerminatedExceptionally.get()).isTrue());
+  }
+
+  @DisplayName("A message, that triggers an non-retryable exception in the domain-logic, is skipped and all other messages are consumed")
+  @Test
+  void testNonRetryableDomainError() throws Exception
+  {
+    createExampleConsumer();
+
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersNonRetriableExceptionInDomain(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All other valid messages are consumed")
+      .atMost(Duration.ofSeconds(15))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
+  }
+
+  @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
+  @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}")
+  @ValueSource(ints = { 1, 2, 3, 4, 5, 6 })
+  void testOneMessageCausesRetryableDomainErrors(int numFailures)
+  {
+    createExampleConsumer();
+
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRetriableExceptionInDomain(3,numFailures);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All messages are eventually consumed")
+      .atMost(Duration.ofSeconds(15))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+  }
+
+  @DisplayName("All messages on a partition are delayed and a message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
+  @ParameterizedTest(name = "Delay for normal messages: {0}ms")
+  @ValueSource(ints = { 10, 20, 50, 100, 150, 200 })
+  void testOneMessageCausesRetryableDomainErrorsWhileAllMessagesAreDelayed(int delay)
+  {
+    createExampleConsumer();
+    mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+    mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(100);
+
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 1);
+    sendValidMessage(3);
+
+    Awaitility
+      .await("All messages are eventually consumed")
+      .atMost(Duration.ofSeconds(15))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+  }
+
+  @DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed")
+  @Test
+  void testOneMessageCausesRetryableDomainErrors()
+  {
+    createExampleConsumer();
+
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRetriableExceptionInDomain(3,66);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All other messages are eventually consumed")
+      .atMost(Duration.ofSeconds(15))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
+  }
+
+  @DisplayName("Several messages are triggering retryable exception in one partition, some of them fail so often, that they are skipped and all successful messages are eventually consumed")
+  @ParameterizedTest(name = "Delay for errors: {0}ms")
+  @ValueSource(ints = { 10, 20, 50, 100, 150, 200 })
+  void testSeveralMessagesCausesRetryableDomainErrorsInOnePartition(int delay)
+  {
+    createExampleConsumer();
+    mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(delay);
+
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 4);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 6);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 1);
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 5);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 6);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 3);
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 1);
+    sendValidMessage(3);
+
+    Awaitility
+      .await("All other messages are eventually consumed")
+      .atMost(Duration.ofSeconds(15))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(16));
+  }
+
+  @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed")
+  @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
+  @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" })
+  void testThreeMessagesCauseRetryableDomainErrors(
+    int numFailuresForMessageA,
+    int numFailuresForMessageB,
+    int numFailuresForMessageC)
+  {
+    createExampleConsumer();
+
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRetriableExceptionInDomain(3,numFailuresForMessageA);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageB);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageC);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All messages are eventually consumed")
+      .atMost(Duration.ofSeconds(20))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(30));
+  }
+
+  @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried, but one of them fails too often and is skipped, still all other messages are eventually consumed")
+  @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
+  @CsvSource({ "66,3,4", "4,66,2", "1,2,66" })
+  void testThreeMessagesCauseRetryableDomainErrorsAndOneFailsTooOften(
+    int numFailuresForMessageA,
+    int numFailuresForMessageB,
+    int numFailuresForMessageC)
+  {
+    createExampleConsumer();
+
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRetriableExceptionInDomain(3,numFailuresForMessageA);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageB);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageC);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All other messages are eventually consumed")
+      .atMost(Duration.ofSeconds(20))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(29));
+  }
+
+  @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried 3 times with a fixed back-of and all messages are eventually consumed")
+  @ParameterizedTest(name = "Back-Off millis: {0}")
+  @ValueSource(ints = { 100, 250, 500, 1000 })
+  void testOneMessageIsRetriedWithFixedBackOff(int backOffMillis)
+  {
+    BackOff backOff = new FixedBackOff(backOffMillis, 3);
+    createExampleConsumer(backOff);
+
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All messages are eventually consumed")
+      .atMost(Duration.ofSeconds(15))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
   }
 
 
   static final String ID = "TEST";
   static final String TOPIC = "ExampleConsumerTest_TEST";
   static final int NUM_PARTITIONS = 10;
+  static final int NUM_RETRIES = 6;
+  static final int POLL_REQUEST_TIMEOUT_MS = 50;
+  static final int MAX_POLL_INTERVALL_MS = 500;
+  static final int MAX_TIME_PER_RECORD_MS = 100;
+  static final int FETCH_MAX_WAIT_MS = 50;
+  static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100;
 
   @Autowired
   KafkaTemplate<String, byte[]> kafkaTemplate;
+  @Autowired ApplicationProperties properties;
+  @Autowired Clock clock;
 
   final LongSerializer serializer = new LongSerializer();
   final long[] currentOffsets = new long[NUM_PARTITIONS];
@@ -160,8 +521,12 @@ public class ExampleConsumerTest
   ExampleConsumer exampleConsumer;
 
 
-  @BeforeEach
-  void createExampleConsumer(@Autowired ApplicationProperties properties)
+  void createExampleConsumer()
+  {
+    createExampleConsumer(new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()));
+  }
+
+  void createExampleConsumer(BackOff backOff)
   {
     ApplicationConfiguration configuration = new ApplicationConfiguration();
     Consumer<String, Long> consumer = configuration.kafkaConsumer(properties);
@@ -171,9 +536,22 @@ public class ExampleConsumerTest
       TOPIC,
       consumer,
       mockRecordHandler,
+      clock,
+      properties.getConsumerProperties().getPollRequestTimeout(),
+      properties.getConsumerProperties().getMaxPollInterval(),
+      properties.getConsumerProperties().getMaxTimePerRecord(),
+      properties.getConsumerProperties().getMinSlackPerPollInterval(),
+      backOff,
       () -> isTerminatedExceptionally.set(true));
   }
 
+  @BeforeEach
+  void resetParameters()
+  {
+    mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(0);
+    mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(0);
+  }
+
   @AfterEach
   void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
   {
@@ -218,6 +596,16 @@ public class ExampleConsumerTest
     send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION);
   }
 
+  private void sendMessageThatTriggersNonRetriableExceptionInDomain(int partition)
+  {
+    send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION));
+  }
+
+  private void sendMessageThatTriggersRetriableExceptionInDomain(int partition, int numFailures)
+  {
+    send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION * numFailures));
+  }
+
   private void send(int partition, long message)
   {
     send(partition, serializer.serialize(TOPIC, message));
@@ -237,6 +625,9 @@ public class ExampleConsumerTest
 
 
   public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+  public final static int VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION = -2;
+  public final static int VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION = -3;
+
 
 
   @TestConfiguration
@@ -249,5 +640,11 @@ public class ExampleConsumerTest
       properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
       return AdminClient.create(properties);
     }
+
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
   }
 }
index 9f540f0..c294256 100644 (file)
 package de.juplo.kafka;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import static de.juplo.kafka.ExampleConsumerTest.*;
 
 
-@RequiredArgsConstructor
 @Slf4j
 public class MockRecordHandler implements RecordHandler<String, Long>
 {
-  @Getter
+  private final Map<OffsetInPartition, Integer> retriableErrors = new HashMap<>();
+
+  Duration normalRecordHandlingDelay;
+  Duration exceptionalRecordHandlingDelay;
+
   private int numMessagesHandled = 0;
 
+  public int getNumMessagesHandled()
+  {
+    return numMessagesHandled;
+  }
+
   @Override
   public void handleRecord(
     String topic,
     Integer partition,
     Long offset,
     String key,
-    Long value)
+    Long value) throws RetriableErrorException, NonRetriableErrorException
   {
     if (value != null && value < 0)
     {
-      generateError(value);
+      generateError(new OffsetInPartition(offset, partition), value.intValue());
     }
 
+    sleep(normalRecordHandlingDelay);
     numMessagesHandled++;
     log.trace("Handled {} messages so far", numMessagesHandled);
   }
 
-  private void generateError(long value)
+  private void generateError(
+    OffsetInPartition offset,
+    int value) throws RetriableErrorException, NonRetriableErrorException
   {
     if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION)
     {
       throw new RuntimeException("Unexpected application error!");
     }
 
-    log.info("Not specifically mapped error: {}", value);
+    if (value == VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION)
+    {
+      throw new NonRetriableErrorException("Non-Retryable application error!");
+    }
+
+    if ((float)value % (float)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION == 0f)
+    {
+      int totalOccurrences = value / VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION;
+      int occurrence = retriableErrors.compute(
+        offset,
+        (k, v) ->
+        {
+          if (v == null)
+          {
+            v = totalOccurrences;
+          }
+          else
+          {
+            v--;
+          }
+
+          return v;
+        });
+
+      if (occurrence <= 0)
+      {
+        retriableErrors.remove(offset);
+      }
+      else
+      {
+        log.debug(
+          "Simulating occurrence #{} of {} for a retryable error at offset {} in partition {}",
+          totalOccurrences - occurrence + 1,
+          totalOccurrences,
+          offset.offset,
+          offset.partition);
+        sleep(exceptionalRecordHandlingDelay);
+        throw new RetriableErrorException("Retryable application error! Occurrence #" + (totalOccurrences - occurrence + 1));
+      }
+
+      log.info("Simulating a resolved retryable error after {} occurrences of the error", totalOccurrences);
+    }
+    else
+    {
+      log.warn("Not specifically mapped error: {}", value);
+    }
+  }
+
+  private void sleep(Duration duration)
+  {
+    try
+    {
+      Thread.sleep(duration);
+    }
+    catch(InterruptedException e)
+    {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
   }
 
   public void clear()
   {
+    retriableErrors.clear();
     numMessagesHandled = 0;
   }
+
+
+  private static record OffsetInPartition(long offset, int partition) {}
 }