From 1ebd3021e10996f542a9bd9d3ebdbc2afcaf48e9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 22 Dec 2024 20:57:31 +0100 Subject: [PATCH] =?utf8?q?GREEN:=20Erwartetes=20Verhalten=20f=C3=BCr=20unt?= =?utf8?q?erschiedliche=20Delays=20definiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ExampleConsumer.java | 1 + .../de/juplo/kafka/ExampleConsumerTest.java | 74 ++++++++++++++++++- .../de/juplo/kafka/MockRecordHandler.java | 2 + 3 files changed, 76 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 89c75f13..3a6695ef 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -119,6 +119,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable Instant now = clock.instant(); Duration timeLeft = Duration.between(now, deadline); + log.trace("{} - Time left for current poll: {}", id, timeLeft); if (timeLeft.minus(minTimeForNextRecord).isNegative()) { diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index 4394e9a1..8cf1e22d 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -56,7 +56,7 @@ import static de.juplo.kafka.ExampleConsumerTest.*; @EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS) public class ExampleConsumerTest { - @DisplayName("All messages are consumed") + @DisplayName("All messages are consumed as expected") @Test void testOnlyValidMessages() { @@ -89,6 +89,41 @@ public class ExampleConsumerTest .until(() -> mockRecordHandler.getNumMessagesHandled() == 20); } + @DisplayName("Delayed messages are consumed as expected") + @ParameterizedTest(name = "delay for message-consumption: {0}") + @ValueSource(ints = { 10, 25, 50, 75, 100 }) + void testOnlyValidMessagesButAllDelayed(int delay) + { + createExampleConsumer(); + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are consumed") + .atMost(Duration.ofSeconds(5)) + .until(() -> mockRecordHandler.getNumMessagesHandled() == 20); + } + @DisplayName("A deserialization exception is skipped and all valid messages are consumed") @Test void testDeserializationException() @@ -223,6 +258,42 @@ public class ExampleConsumerTest .until(() -> mockRecordHandler.getNumMessagesHandled() == 20); } + @DisplayName("All messages on a partition are delayed and a message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed") + @ParameterizedTest(name = "Delay for normal messages: {0}ms") + @ValueSource(ints = { 10, 20, 50, 100, 150, 200 }) + void testOneMessageCausesRetryableDomainErrorsWhileAllMessagesAreDelayed(int delay) + { + createExampleConsumer(); + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay); + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(100); + + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 1); + sendValidMessage(3); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .until(() -> mockRecordHandler.getNumMessagesHandled() == 20); + } + @DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed") @Test void testOneMessageCausesRetryableDomainErrors() @@ -401,6 +472,7 @@ public class ExampleConsumerTest @BeforeEach void resetParameters() { + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(0); mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(0); } diff --git a/src/test/java/de/juplo/kafka/MockRecordHandler.java b/src/test/java/de/juplo/kafka/MockRecordHandler.java index 37be0b42..c294256b 100644 --- a/src/test/java/de/juplo/kafka/MockRecordHandler.java +++ b/src/test/java/de/juplo/kafka/MockRecordHandler.java @@ -14,6 +14,7 @@ public class MockRecordHandler implements RecordHandler { private final Map retriableErrors = new HashMap<>(); + Duration normalRecordHandlingDelay; Duration exceptionalRecordHandlingDelay; private int numMessagesHandled = 0; @@ -36,6 +37,7 @@ public class MockRecordHandler implements RecordHandler generateError(new OffsetInPartition(offset, partition), value.intValue()); } + sleep(normalRecordHandlingDelay); numMessagesHandled++; log.trace("Handled {} messages so far", numMessagesHandled); } -- 2.20.1