From 20e3da97b6e0ec950f91dcb6a6c1de161c216e24 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 11 Jan 2025 00:41:42 +0100 Subject: [PATCH] =?utf8?q?Test=20f=C3=BCr=20viele=20Fehler=20in=20einer=20?= =?utf8?q?Partition=20in=20`BackOffStateTest`=20erg=C3=A4nzt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../de/juplo/kafka/ExampleConsumerTest.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index 35be4842..1f9713fa 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -50,6 +50,7 @@ import static org.assertj.core.api.Assertions.assertThat; "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms", "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms", "juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms", + "juplo.consumer.fetch-max-wait=" + FETCH_MAX_WAIT_MS + "ms", "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms", "juplo.consumer.num-retries=" + NUM_RETRIES, "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", @@ -329,6 +330,41 @@ public class ExampleConsumerTest .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19)); } + @DisplayName("Several messages are triggering retryable exception in one partition, some of them fail so often, that they are skipped and all successful messages are eventually consumed") + @ParameterizedTest(name = "Delay for errors: {0}ms") + @ValueSource(ints = { 10, 20, 50, 100, 150, 200 }) + void testSeveralMessagesCausesRetryableDomainErrorsInOnePartition(int delay) + { + createExampleConsumer(); + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(delay); + + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 4); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 6); + sendMessageThatTriggersRetriableExceptionInDomain(3, 1); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 66); + sendMessageThatTriggersRetriableExceptionInDomain(3, 5); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 66); + sendMessageThatTriggersRetriableExceptionInDomain(3, 6); + sendMessageThatTriggersRetriableExceptionInDomain(3, 66); + sendMessageThatTriggersRetriableExceptionInDomain(3, 3); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 66); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 1); + sendValidMessage(3); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(16)); + } + @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed") @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}") @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" }) @@ -466,6 +502,7 @@ public class ExampleConsumerTest static final int POLL_REQUEST_TIMEOUT_MS = 50; static final int MAX_POLL_INTERVALL_MS = 500; static final int MAX_TIME_PER_RECORD_MS = 100; + static final int FETCH_MAX_WAIT_MS = 50; static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100; @Autowired -- 2.20.1