GREEN: Erwartetes Verhalten für unterschiedliche Delays definiert
authorKai Moritz <kai@juplo.de>
Sun, 22 Dec 2024 19:57:31 +0000 (20:57 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 6 Feb 2025 17:04:39 +0000 (18:04 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/test/java/de/juplo/kafka/ExampleConsumerTest.java
src/test/java/de/juplo/kafka/MockRecordHandler.java

index 89c75f1..3a6695e 100644 (file)
@@ -119,6 +119,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
 
               Instant now = clock.instant();
               Duration timeLeft = Duration.between(now, deadline);
+              log.trace("{} - Time left for current poll: {}", id, timeLeft);
 
               if (timeLeft.minus(minTimeForNextRecord).isNegative())
               {
index 4394e9a..8cf1e22 100644 (file)
@@ -56,7 +56,7 @@ import static de.juplo.kafka.ExampleConsumerTest.*;
 @EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
 public class ExampleConsumerTest
 {
-  @DisplayName("All messages are consumed")
+  @DisplayName("All messages are consumed as expected")
   @Test
   void testOnlyValidMessages()
   {
@@ -89,6 +89,41 @@ public class ExampleConsumerTest
       .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
   }
 
+  @DisplayName("Delayed messages are consumed as expected")
+  @ParameterizedTest(name = "delay for message-consumption: {0}")
+  @ValueSource(ints = { 10, 25, 50, 75, 100 })
+  void testOnlyValidMessagesButAllDelayed(int delay)
+  {
+    createExampleConsumer();
+    mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All messages are consumed")
+      .atMost(Duration.ofSeconds(5))
+      .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+  }
+
   @DisplayName("A deserialization exception is skipped and all valid messages are consumed")
   @Test
   void testDeserializationException()
@@ -223,6 +258,42 @@ public class ExampleConsumerTest
       .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
   }
 
+  @DisplayName("All messages on a partition are delayed and a message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
+  @ParameterizedTest(name = "Delay for normal messages: {0}ms")
+  @ValueSource(ints = { 10, 20, 50, 100, 150, 200 })
+  void testOneMessageCausesRetryableDomainErrorsWhileAllMessagesAreDelayed(int delay)
+  {
+    createExampleConsumer();
+    mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+    mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(100);
+
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendValidMessage(3);
+    sendMessageThatTriggersRetriableExceptionInDomain(3, 1);
+    sendValidMessage(3);
+
+    Awaitility
+      .await("All messages are eventually consumed")
+      .atMost(Duration.ofSeconds(15))
+      .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+  }
+
   @DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed")
   @Test
   void testOneMessageCausesRetryableDomainErrors()
@@ -401,6 +472,7 @@ public class ExampleConsumerTest
   @BeforeEach
   void resetParameters()
   {
+    mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(0);
     mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(0);
   }
 
index 37be0b4..c294256 100644 (file)
@@ -14,6 +14,7 @@ public class MockRecordHandler implements RecordHandler<String, Long>
 {
   private final Map<OffsetInPartition, Integer> retriableErrors = new HashMap<>();
 
+  Duration normalRecordHandlingDelay;
   Duration exceptionalRecordHandlingDelay;
 
   private int numMessagesHandled = 0;
@@ -36,6 +37,7 @@ public class MockRecordHandler implements RecordHandler<String, Long>
       generateError(new OffsetInPartition(offset, partition), value.intValue());
     }
 
+    sleep(normalRecordHandlingDelay);
     numMessagesHandled++;
     log.trace("Handled {} messages so far", numMessagesHandled);
   }