Test für viele Fehler in einer Partition in `BackOffStateTest` ergänzt
authorKai Moritz <kai@juplo.de>
Fri, 10 Jan 2025 23:41:42 +0000 (00:41 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 24 Jan 2025 19:42:18 +0000 (20:42 +0100)
src/test/java/de/juplo/kafka/ExampleConsumerTest.java

index 35be484..1f9713f 100644 (file)
@@ -50,6 +50,7 @@ import static org.assertj.core.api.Assertions.assertThat;
     "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms",
     "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms",
     "juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms",
+    "juplo.consumer.fetch-max-wait=" + FETCH_MAX_WAIT_MS + "ms",
     "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms",
     "juplo.consumer.num-retries=" + NUM_RETRIES,
     "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
@@ -329,6 +330,41 @@ public class ExampleConsumerTest
       .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
   }
 
+  @DisplayName("Several messages are triggering retryable exception in one partition, some of them fail so often, that they are skipped and all successful messages are eventually consumed")
+  @ParameterizedTest(name = "Delay for errors: {0}ms")
+  @ValueSource(ints = { 10, 20, 50, 100, 150, 200 })
+  void testSeveralMessagesCausesRetryableDomainErrorsInOnePartition(int delay)
+  {
+    createExampleConsumer();
+    mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(delay);
+
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 4);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 6);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 1);
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 5);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 6);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 3);
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 1);
+    sendValidMessage(3);
+
+    Awaitility
+      .await("All other messages are eventually consumed")
+      .atMost(Duration.ofSeconds(15))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(16));
+  }
+
   @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed")
   @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
   @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" })
@@ -466,6 +502,7 @@ public class ExampleConsumerTest
   static final int POLL_REQUEST_TIMEOUT_MS = 50;
   static final int MAX_POLL_INTERVALL_MS = 500;
   static final int MAX_TIME_PER_RECORD_MS = 100;
+  static final int FETCH_MAX_WAIT_MS = 50;
   static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100;
 
   @Autowired