From: Kai Moritz Date: Sun, 22 Dec 2024 19:57:31 +0000 (+0100) Subject: GREEN: Erwartetes Verhalten für unterschiedliche Delays definiert X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=304f28ddf04b14902dbf8d306a6a4958c6e343b9;p=demos%2Fkafka%2Ftraining GREEN: Erwartetes Verhalten für unterschiedliche Delays definiert --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index c7208159..504d1b56 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -119,6 +119,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable Instant now = clock.instant(); Duration timeLeft = Duration.between(now, deadline); + log.trace("{} - Time left for current poll: {}", id, timeLeft); if (timeLeft.minus(minTimeForNextRecord).isNegative()) { diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index 4394e9a1..53ca0308 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -56,7 +56,7 @@ import static de.juplo.kafka.ExampleConsumerTest.*; @EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS) public class ExampleConsumerTest { - @DisplayName("All messages are consumed") + @DisplayName("All messages are consumed as expected") @Test void testOnlyValidMessages() { @@ -89,6 +89,41 @@ public class ExampleConsumerTest .until(() -> mockRecordHandler.getNumMessagesHandled() == 20); } + @DisplayName("Delayed messages are consumed as expected") + @ParameterizedTest(name = "delay for message-consumption: {0}") + @ValueSource(ints = { 10, 25, 50, 75, 100 }) + void testOnlyValidMessagesButAllDelayed(int delay) + { + createExampleConsumer(); + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are consumed") + .atMost(Duration.ofSeconds(5)) + .until(() -> mockRecordHandler.getNumMessagesHandled() == 20); + } + @DisplayName("A deserialization exception is skipped and all valid messages are consumed") @Test void testDeserializationException() @@ -401,6 +436,7 @@ public class ExampleConsumerTest @BeforeEach void resetParameters() { + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(0); mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(0); } diff --git a/src/test/java/de/juplo/kafka/MockRecordHandler.java b/src/test/java/de/juplo/kafka/MockRecordHandler.java index 37be0b42..c294256b 100644 --- a/src/test/java/de/juplo/kafka/MockRecordHandler.java +++ b/src/test/java/de/juplo/kafka/MockRecordHandler.java @@ -14,6 +14,7 @@ public class MockRecordHandler implements RecordHandler { private final Map retriableErrors = new HashMap<>(); + Duration normalRecordHandlingDelay; Duration exceptionalRecordHandlingDelay; private int numMessagesHandled = 0; @@ -36,6 +37,7 @@ public class MockRecordHandler implements RecordHandler generateError(new OffsetInPartition(offset, partition), value.intValue()); } + sleep(normalRecordHandlingDelay); numMessagesHandled++; log.trace("Handled {} messages so far", numMessagesHandled); }