From: Kai Moritz Date: Fri, 10 Jan 2025 23:41:42 +0000 (+0100) Subject: Test für viele Fehler in einer Partition in `BackOffStateTest` ergänzt X-Git-Tag: consumer/spring-consumer--error-handling--COMMITS--2025-02~5 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=1aeee3cb2e0d50468be5d7017626724a8b1ff19b;p=demos%2Fkafka%2Ftraining Test für viele Fehler in einer Partition in `BackOffStateTest` ergänzt --- diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index 35be4842..1f9713fa 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -50,6 +50,7 @@ import static org.assertj.core.api.Assertions.assertThat; "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms", "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms", "juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms", + "juplo.consumer.fetch-max-wait=" + FETCH_MAX_WAIT_MS + "ms", "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms", "juplo.consumer.num-retries=" + NUM_RETRIES, "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", @@ -329,6 +330,41 @@ public class ExampleConsumerTest .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19)); } + @DisplayName("Several messages are triggering retryable exception in one partition, some of them fail so often, that they are skipped and all successful messages are eventually consumed") + @ParameterizedTest(name = "Delay for errors: {0}ms") + @ValueSource(ints = { 10, 20, 50, 100, 150, 200 }) + void testSeveralMessagesCausesRetryableDomainErrorsInOnePartition(int delay) + { + createExampleConsumer(); + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(delay); + + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 4); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 6); + sendMessageThatTriggersRetriableExceptionInDomain(3, 1); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 66); + sendMessageThatTriggersRetriableExceptionInDomain(3, 5); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 66); + sendMessageThatTriggersRetriableExceptionInDomain(3, 6); + sendMessageThatTriggersRetriableExceptionInDomain(3, 66); + sendMessageThatTriggersRetriableExceptionInDomain(3, 3); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 66); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 1); + sendValidMessage(3); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(16)); + } + @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed") @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}") @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" }) @@ -466,6 +502,7 @@ public class ExampleConsumerTest static final int POLL_REQUEST_TIMEOUT_MS = 50; static final int MAX_POLL_INTERVALL_MS = 500; static final int MAX_TIME_PER_RECORD_MS = 100; + static final int FETCH_MAX_WAIT_MS = 50; static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100; @Autowired