GREEN: Erwartetes Verhalten für unterschiedliche Delays definiert
authorKai Moritz <kai@juplo.de>
Sun, 22 Dec 2024 19:57:31 +0000 (20:57 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 5 Jan 2025 11:26:36 +0000 (12:26 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/test/java/de/juplo/kafka/ExampleConsumerTest.java
src/test/java/de/juplo/kafka/MockRecordHandler.java

index c720815..504d1b5 100644 (file)
@@ -119,6 +119,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
 
               Instant now = clock.instant();
               Duration timeLeft = Duration.between(now, deadline);
+              log.trace("{} - Time left for current poll: {}", id, timeLeft);
 
               if (timeLeft.minus(minTimeForNextRecord).isNegative())
               {
index 4394e9a..53ca030 100644 (file)
@@ -56,7 +56,7 @@ import static de.juplo.kafka.ExampleConsumerTest.*;
 @EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
 public class ExampleConsumerTest
 {
-  @DisplayName("All messages are consumed")
+  @DisplayName("All messages are consumed as expected")
   @Test
   void testOnlyValidMessages()
   {
@@ -89,6 +89,41 @@ public class ExampleConsumerTest
       .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
   }
 
+  @DisplayName("Delayed messages are consumed as expected")
+  @ParameterizedTest(name = "delay for message-consumption: {0}")
+  @ValueSource(ints = { 10, 25, 50, 75, 100 })
+  void testOnlyValidMessagesButAllDelayed(int delay)
+  {
+    createExampleConsumer();
+    mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All messages are consumed")
+      .atMost(Duration.ofSeconds(5))
+      .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+  }
+
   @DisplayName("A deserialization exception is skipped and all valid messages are consumed")
   @Test
   void testDeserializationException()
@@ -401,6 +436,7 @@ public class ExampleConsumerTest
   @BeforeEach
   void resetParameters()
   {
+    mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(0);
     mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(0);
   }
 
index 37be0b4..c294256 100644 (file)
@@ -14,6 +14,7 @@ public class MockRecordHandler implements RecordHandler<String, Long>
 {
   private final Map<OffsetInPartition, Integer> retriableErrors = new HashMap<>();
 
+  Duration normalRecordHandlingDelay;
   Duration exceptionalRecordHandlingDelay;
 
   private int numMessagesHandled = 0;
@@ -36,6 +37,7 @@ public class MockRecordHandler implements RecordHandler<String, Long>
       generateError(new OffsetInPartition(offset, partition), value.intValue());
     }
 
+    sleep(normalRecordHandlingDelay);
     numMessagesHandled++;
     log.trace("Handled {} messages so far", numMessagesHandled);
   }