From 4263f414b9f34e523b52ffc45c55cf9fc0e6d0d4 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 16 Dec 2024 21:29:24 +0100 Subject: [PATCH] =?utf8?q?RED:=20Erwartetes=20Verhalten=20f=C3=BCr=20die?= =?utf8?q?=20Fehlerbehandlung=20definiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../de/juplo/kafka/ExampleConsumerTest.java | 215 +++++++++++++++++- .../de/juplo/kafka/MockRecordHandler.java | 91 +++++++- 2 files changed, 295 insertions(+), 11 deletions(-) diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index b480fa38..c2534f20 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -12,6 +12,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; @@ -31,8 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS; -import static de.juplo.kafka.ExampleConsumerTest.TOPIC; +import static de.juplo.kafka.ExampleConsumerTest.*; @SpringBootTest( @@ -43,6 +45,9 @@ import static de.juplo.kafka.ExampleConsumerTest.TOPIC; }, properties = { "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms", + "juplo.consumer.max-time-per-record=" + ERROR_TIMEOUT_MS + "ms", + "juplo.consumer.num-retries=" + NUM_RETRIES, "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", "logging.level.de.juplo.kafka=TRACE", }) @@ -143,10 +148,197 @@ public class ExampleConsumerTest .until(() -> isTerminatedExceptionally.get()); } + @DisplayName("A message, that triggers an non-retryable exception in the domain-logic, is skipped and all other messages are consumed") + @Test + void testNonRetryableDomainError() throws Exception + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersNonRetriableExceptionInDomain(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All other valid messages are consumed") + .atMost(Duration.ofSeconds(15)) + .until(() -> mockRecordHandler.getNumMessagesHandled() == 19); + } + + @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed") + @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}") + @ValueSource(ints = { 1, 2, 3, 4, 5, 6 }) + void testOneMessageCausesRetryableDomainErrors(int numFailures) + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetriableExceptionInDomain(3,numFailures); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .until(() -> mockRecordHandler.getNumMessagesHandled() == 20); + } + + @DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed") + @Test + void testOneMessageCausesRetryableDomainErrors() + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetriableExceptionInDomain(3,66); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .until(() -> mockRecordHandler.getNumMessagesHandled() == 19); + } + + @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed") + @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}") + @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" }) + void testThreeMessagesCauseRetryableDomainErrors( + int numFailuresForMessageA, + int numFailuresForMessageB, + int numFailuresForMessageC) + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetriableExceptionInDomain(3,numFailuresForMessageA); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageB); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageC); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(20)) + .until(() -> mockRecordHandler.getNumMessagesHandled() == 30); + } + + @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried, but one of them fails too often and is skipped, still all other messages are eventually consumed") + @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}") + @CsvSource({ "66,3,4", "4,66,2", "1,2,66" }) + void testThreeMessagesCauseRetryableDomainErrorsAndOneFailsTooOften( + int numFailuresForMessageA, + int numFailuresForMessageB, + int numFailuresForMessageC) + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetriableExceptionInDomain(3,numFailuresForMessageA); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageB); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageC); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(20)) + .until(() -> mockRecordHandler.getNumMessagesHandled() == 29); + } + static final String ID = "TEST"; static final String TOPIC = "ExampleConsumerTest_TEST"; static final int NUM_PARTITIONS = 10; + static final int NUM_RETRIES = 6; + static final int MAX_POLL_INTERVALL_MS = 5000; + static final int ERROR_TIMEOUT_MS = 1000; @Autowired KafkaTemplate kafkaTemplate; @@ -182,6 +374,12 @@ public class ExampleConsumerTest () -> isTerminatedExceptionally.set(true)); } + @BeforeEach + void resetParameters() + { + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(ERROR_TIMEOUT_MS); + } + @AfterEach void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException { @@ -226,6 +424,16 @@ public class ExampleConsumerTest send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION); } + private void sendMessageThatTriggersNonRetriableExceptionInDomain(int partition) + { + send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION)); + } + + private void sendMessageThatTriggersRetriableExceptionInDomain(int partition, int numFailures) + { + send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION * numFailures)); + } + private void send(int partition, long message) { send(partition, serializer.serialize(TOPIC, message)); @@ -245,6 +453,9 @@ public class ExampleConsumerTest public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1; + public final static int VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION = -2; + public final static int VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION = -3; + @TestConfiguration diff --git a/src/test/java/de/juplo/kafka/MockRecordHandler.java b/src/test/java/de/juplo/kafka/MockRecordHandler.java index 9f540f0e..37be0b42 100644 --- a/src/test/java/de/juplo/kafka/MockRecordHandler.java +++ b/src/test/java/de/juplo/kafka/MockRecordHandler.java @@ -1,48 +1,121 @@ package de.juplo.kafka; -import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import static de.juplo.kafka.ExampleConsumerTest.*; -@RequiredArgsConstructor @Slf4j public class MockRecordHandler implements RecordHandler { - @Getter + private final Map retriableErrors = new HashMap<>(); + + Duration exceptionalRecordHandlingDelay; + private int numMessagesHandled = 0; + public int getNumMessagesHandled() + { + return numMessagesHandled; + } + @Override public void handleRecord( String topic, Integer partition, Long offset, String key, - Long value) + Long value) throws RetriableErrorException, NonRetriableErrorException { if (value != null && value < 0) { - generateError(value); + generateError(new OffsetInPartition(offset, partition), value.intValue()); } numMessagesHandled++; log.trace("Handled {} messages so far", numMessagesHandled); } - private void generateError(long value) + private void generateError( + OffsetInPartition offset, + int value) throws RetriableErrorException, NonRetriableErrorException { if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION) { throw new RuntimeException("Unexpected application error!"); } - log.info("Not specifically mapped error: {}", value); + if (value == VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION) + { + throw new NonRetriableErrorException("Non-Retryable application error!"); + } + + if ((float)value % (float)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION == 0f) + { + int totalOccurrences = value / VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION; + int occurrence = retriableErrors.compute( + offset, + (k, v) -> + { + if (v == null) + { + v = totalOccurrences; + } + else + { + v--; + } + + return v; + }); + + if (occurrence <= 0) + { + retriableErrors.remove(offset); + } + else + { + log.debug( + "Simulating occurrence #{} of {} for a retryable error at offset {} in partition {}", + totalOccurrences - occurrence + 1, + totalOccurrences, + offset.offset, + offset.partition); + sleep(exceptionalRecordHandlingDelay); + throw new RetriableErrorException("Retryable application error! Occurrence #" + (totalOccurrences - occurrence + 1)); + } + + log.info("Simulating a resolved retryable error after {} occurrences of the error", totalOccurrences); + } + else + { + log.warn("Not specifically mapped error: {}", value); + } + } + + private void sleep(Duration duration) + { + try + { + Thread.sleep(duration); + } + catch(InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } } public void clear() { + retriableErrors.clear(); numMessagesHandled = 0; } + + + private static record OffsetInPartition(long offset, int partition) {} } -- 2.20.1