From d662b8bf44be75933910c027891473be4238664a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 14 Apr 2025 22:13:57 +0200 Subject: [PATCH] =?utf8?q?Version=20des=20`spring-consumer`=20mit=20einer?= =?utf8?q?=20vollst=C3=A4ndigen=20Fehlerbehandlung?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Exceptions & Konfig für wiederholbare und nicht wiederholbare Fehler * RED: Erwartetes Verhalten für die Fehlerbehandlung definiert * GREEN: Erweitertes Error-Handling implementiert * Timout für den Poll-Request konfigurierbar gemacht * Timings für den `ExampleConsumerTest` enger gezogen * Keine Verzögerung für Fehler in den Retry-Tests * Zusätzliche Logging-Meldung für Retry-Ablauf * Nicht ausgewertete Unterscheidung entfernt * Erzeugung des `ExampleConsumer` im Tests über Methode konfigurierbar * GREEN: Erwartetes Verhalten für unterschiedliche Delays definiert * Slack für Poll-Intervall ist explizit konfigurierbar * Test für Retries mit fixem Back-Off in `ExampleConsumerTest` ergänzt * Der Offset eines inaktiven `BackOffState` sollte ein ungültiger Wert sein * Innere Klasse `BackOffState` statisch gemacht * Innere Klasse `BackOffState` extrahiert * Logging in `BackOffState` verbessert * Eindeutigere Methodennamen in `BackOffState` * RED - Unit-Test für `BackOffState` implementiert * GREEN - Fehler in der Initialisierung von `BackOffState` korrigiert * Doppelten Code in `BackOffStateTest` in Methoden ausgelagert * Mocking mit `@Mock` auf Klassenebene erspart Parameter-Schlacht * Aussagelose Tests aus `BackOffStateTest` entfernt * BackOff-Zeit in `BackOffStateTest` in statische Variable ausgelagert * Logging-Meldung des `BackOffState` vereinfacht und verbessert * RED: Korrigiertes Verhalten für `BackOffState` definiert * GREEN: Implementierung von `BackOffState` korrigiert * Umstellung des `ExampleConsumerTest` auf AssertJ * `fetch.max.wait` konfigurierbar gemacht * Test für viele Fehler in einer Partition in `BackOffStateTest` ergänzt * Überprüfung des ``BackOffState``-Zustands berücksichtigt immer den Offset * Bedingungsloser Reset (und besserer Methoden-Name) für `BackOffState` * `BackOffState` wird nur 1x erzeugt und danach zurückgesetzt und gestartet * Überflüssiges Attribut in `BackOffState` entfernt Neu: * Test für unterschiedliche Nachrichten-Typen vorbereitet. --- .../juplo/kafka/ApplicationConfiguration.java | 37 ++ .../de/juplo/kafka/ApplicationProperties.java | 10 + .../java/de/juplo/kafka/BackOffState.java | 98 +++++ .../java/de/juplo/kafka/ExampleConsumer.java | 150 ++++++- .../kafka/NonRetryableErrorException.java | 9 + .../java/de/juplo/kafka/RecordHandler.java | 2 +- .../juplo/kafka/RetryableErrorException.java | 9 + src/main/resources/application.yml | 10 + .../kafka/AbstractExampleConsumerTest.java | 406 +++++++++++++++++- .../kafka/AbstractMockRecordHandler.java | 35 +- .../java/de/juplo/kafka/BackOffStateTest.java | 304 +++++++++++++ .../juplo/kafka/LongExampleConsumerTest.java | 14 + .../de/juplo/kafka/LongMockRecordHandler.java | 60 ++- 13 files changed, 1121 insertions(+), 23 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/BackOffState.java create mode 100644 src/main/java/de/juplo/kafka/NonRetryableErrorException.java create mode 100644 src/main/java/de/juplo/kafka/RetryableErrorException.java create mode 100644 src/test/java/de/juplo/kafka/BackOffStateTest.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 680a8390..41d5911c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,12 +2,19 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; + +import java.time.Clock; +import java.time.Duration; +import java.util.Properties; @Configuration @@ -15,20 +22,38 @@ import org.springframework.kafka.core.ConsumerFactory; @Slf4j public class ApplicationConfiguration { + public final static String MAX_POLL_INTERVALL_CONFIG_KEY = "max.poll.interval.ms"; + public final static Duration MAX_POLL_INTERVALL_DEFAULT_VALUE = Duration.ofMinutes(5); @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, RecordHandler recordHandler, ApplicationProperties properties, KafkaProperties kafkaProperties, + Clock clock, + BackOff backOffStrategy, ConfigurableApplicationContext applicationContext) { + String maxPollIntervalMs = kafkaProperties + .getConsumer() + .getProperties() + .get(MAX_POLL_INTERVALL_CONFIG_KEY); + Duration maxPollInterval = maxPollIntervalMs == null + ? MAX_POLL_INTERVALL_DEFAULT_VALUE + : Duration.ofMillis(Integer.valueOf(maxPollIntervalMs)); + return new ExampleConsumer<>( kafkaProperties.getClientId(), properties.getConsumerProperties().getTopic(), kafkaConsumer, recordHandler, + clock, + properties.getConsumerProperties().getPollRequestTimeout(), + maxPollInterval, + properties.getConsumerProperties().getMaxTimePerRecord(), + properties.getConsumerProperties().getMinSlackPerPollInterval(), + backOffStrategy, () -> applicationContext.close()); } @@ -38,6 +63,18 @@ public class ApplicationConfiguration return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value); } + @Bean + public BackOff backOffStrategy(ApplicationProperties properties) + { + return new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()); + } + + @Bean + public Clock clock() + { + return Clock.systemDefaultZone(); + } + @Bean(destroyMethod = "") public Consumer kafkaConsumer(ConsumerFactory consumerFactory) { diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 22c755e8..1d4a32b5 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -7,6 +7,8 @@ import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; +import java.time.Duration; + @ConfigurationProperties(prefix = "juplo") @Validated @@ -32,5 +34,13 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; + @NotNull + private Duration pollRequestTimeout; + @NotNull + private Duration maxTimePerRecord; + @NotNull + private Duration minSlackPerPollInterval; + @NotNull + private int numRetries; } } diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java new file mode 100644 index 00000000..8c6785e3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -0,0 +1,98 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; + + +@RequiredArgsConstructor +@Slf4j +class BackOffState +{ + private final String logPrefix; + private final Clock clock; + private final BackOff backOffStrategy; + + @Getter + private long offset; + private BackOffExecution backOffExecution; + private int numRetries = 0; + private Instant timeNextRetryIsDue; + + + void start(long offset) + { + this.offset = offset; + log.info("{} - Back-Off requested for offset={}", logPrefix, offset); + backOffExecution = backOffStrategy.start(); + initializeNextBackOff(); + } + + boolean isWaitingForNextRetry() + { + if (timeNextRetryIsDue == null) + { + return false; + } + + Instant now = clock.instant(); + Duration remaining = Duration.between(now, timeNextRetryIsDue); + if (remaining.isNegative()) + { + log.info( + "{} - {}. retry for offset={}, lateness: {}", + logPrefix, + numRetries, + offset, + remaining.abs()); + initializeNextBackOff(); + return false; + } + else + { + log.info( + "{} - Next retry for offset={} is due in {}", + logPrefix, + offset, + remaining); + return true; + } + } + + boolean isStarted(long offset) + { + return this.offset == offset && backOffExecution != null; + } + + boolean isCompleted(long offset) + { + return this.offset == offset && timeNextRetryIsDue == null; + } + + void reset() + { + timeNextRetryIsDue = null; + offset = -1l; + } + + private void initializeNextBackOff() + { + long backOffMillis = backOffExecution.nextBackOff(); + + if (backOffMillis == BackOffExecution.STOP) + { + timeNextRetryIsDue = null; + } + else + { + numRetries++; + timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis); + } + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 7e820ea2..1cd778ec 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,23 +2,36 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.util.backoff.BackOff; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.Arrays; +import java.util.Collection; +import java.util.List; @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer implements ConsumerRebalanceListener, Runnable { private final String id; private final String topic; private final Consumer consumer; private final RecordHandler recordHandler; private final Thread workerThread; + private final Clock clock; + private final Duration pollRequestTimeout; + private final Duration maxPollInterval; + private final Duration minTimeForNextRecord; + private final BackOff backOffStrategy; + private final BackOffState[] backOffState; private final Runnable closeCallback; private long consumed = 0; @@ -29,12 +42,27 @@ public class ExampleConsumer implements Runnable String topic, Consumer consumer, RecordHandler recordHandler, + Clock clock, + Duration pollRequestTimeout, + Duration maxPollInterval, + Duration maxTimePerRecord, + Duration minSlackPerPollInterval, + BackOff backOffStrategy, Runnable closeCallback) { this.id = clientId; this.topic = topic; this.consumer = consumer; this.recordHandler = recordHandler; + this.clock = clock; + this.pollRequestTimeout = pollRequestTimeout; + this.maxPollInterval = maxPollInterval; + this.minTimeForNextRecord = maxTimePerRecord.plus(minSlackPerPollInterval); + this.backOffStrategy = backOffStrategy; + + int numPartitions = consumer.partitionsFor(topic).size(); + log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); + this.backOffState = new BackOffState[numPartitions]; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -49,23 +77,106 @@ public class ExampleConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), this); while (true) { try { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + ConsumerRecords records = consumer.poll(pollRequestTimeout); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + + Instant deadline = clock.instant().plus(maxPollInterval); + boolean abortCurrentPoll = false; + + for (TopicPartition topicPartition : records.partitions()) { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); + if (backOffState[topicPartition.partition()].isWaitingForNextRetry()) + { + log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition); + consumer.seek(topicPartition, backOffState[topicPartition.partition()].getOffset()); + continue; + } + + List> recordsForPartition = records.records(topicPartition); + log.debug( + "{} - Received {} messages for partition {}", + id, + recordsForPartition.size(), + topicPartition); + + for (ConsumerRecord record : recordsForPartition) + { + if (abortCurrentPoll) + { + consumer.seek(topicPartition, record.offset()); + break; + } + + Instant now = clock.instant(); + Duration timeLeft = Duration.between(now, deadline); + log.trace("{} - Time left for current poll: {}", id, timeLeft); + + if (timeLeft.minus(minTimeForNextRecord).isNegative()) + { + log.info( + "{} - Aborting record handling, because only {} are left until the poll-interval expires!", + id, + timeLeft); + abortCurrentPoll = true; + consumer.seek(topicPartition, record.offset()); + break; + } + + try + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + catch (RetryableErrorException e) + { + // Seeking to the offset of the record, that raised the exception, and + // leaving the loop afterwards, retries the record + int partition = topicPartition.partition(); + if (!backOffState[partition].isStarted(record.offset())) + { + log.info("{} - First occurrence of a retryable error: {}", id, e.toString()); + backOffState[partition].start(record.offset()); + consumer.seek(topicPartition, record.offset()); + break; + } + else + { + if (backOffState[partition].isCompleted(record.offset())) + { + log.warn("{} - Ignoring retryable error: {}", id, e.toString()); + } + else + { + log.info( + "{} - Retry in progress for offset={} in {}, error: {}", + id, + record.offset(), + partition, + e.toString()); + consumer.seek(topicPartition, record.offset()); + break; + } + } + } + catch (NonRetryableErrorException e) + { + // Just ignore, to skip + log.warn("{} - Ignoring non-retryable error: {}", id, e.toString()); + } + + backOffState[topicPartition.partition()].reset(); + } } } catch (RecordDeserializationException e) @@ -104,7 +215,7 @@ public class ExampleConsumer implements Runnable Integer partition, Long offset, K key, - V value) + V value) throws RetryableErrorException, NonRetryableErrorException { consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); @@ -112,6 +223,23 @@ public class ExampleConsumer implements Runnable } + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(topicPartition -> + backOffState[topicPartition.partition()] = new BackOffState( + id + " - partition=" + topicPartition.partition(), + clock, + backOffStrategy)); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> backOffState[tp.partition()] = null); + } + + public void shutdown() throws InterruptedException { log.info("{} - Waking up the consumer", id); diff --git a/src/main/java/de/juplo/kafka/NonRetryableErrorException.java b/src/main/java/de/juplo/kafka/NonRetryableErrorException.java new file mode 100644 index 00000000..89a05c17 --- /dev/null +++ b/src/main/java/de/juplo/kafka/NonRetryableErrorException.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +public class NonRetryableErrorException extends Exception +{ + public NonRetryableErrorException(String message) + { + super(message); + } +} diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java index a7b65af2..fe2bbbc6 100644 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -7,5 +7,5 @@ public interface RecordHandler Integer partition, Long offset, K key, - V value); + V value) throws RetryableErrorException, NonRetryableErrorException; } diff --git a/src/main/java/de/juplo/kafka/RetryableErrorException.java b/src/main/java/de/juplo/kafka/RetryableErrorException.java new file mode 100644 index 00000000..885f15fc --- /dev/null +++ b/src/main/java/de/juplo/kafka/RetryableErrorException.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +public class RetryableErrorException extends Exception +{ + public RetryableErrorException(String message) + { + super(message); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 23e2ff41..0b609a34 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,10 @@ juplo: consumer: topic: test + poll-request-timeout: 1s + max-time-per-record: 30s + min-slack-per-poll-interval: 1s + num-retries: 10 management: endpoint: shutdown: @@ -28,6 +32,12 @@ spring: consumer: group-id: my-group value-deserializer: org.apache.kafka.common.serialization.LongDeserializer + auto-offset-reset: earliest + auto-commit-interval: 5s + max-poll-records: 500 + fetch-max-wait: 500ms + properties: + max.poll.interval.ms: 300000 logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java b/src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java index d87b1752..e726a573 100644 --- a/src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java @@ -12,6 +12,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; @@ -21,7 +24,10 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; +import java.time.Clock; import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -30,8 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static de.juplo.kafka.AbstractExampleConsumerTest.NUM_PARTITIONS; -import static de.juplo.kafka.AbstractExampleConsumerTest.TOPIC; +import static de.juplo.kafka.AbstractExampleConsumerTest.*; import static org.assertj.core.api.Assertions.assertThat; @@ -42,7 +47,13 @@ import static org.assertj.core.api.Assertions.assertThat; AbstractExampleConsumerTest.ConsumerRunnableTestConfig.class, }, properties = { + "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms", + "juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms", + "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms", + "juplo.consumer.num-retries=" + NUM_RETRIES, "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.properties.max.poll.interval.ms=" + MAX_POLL_INTERVALL_MS, + "spring.kafka.consumer.fetch-max-wait=" + FETCH_MAX_WAIT_MS + "ms", "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", "logging.level.de.juplo.kafka=TRACE", }) @@ -53,6 +64,43 @@ public abstract class AbstractExampleConsumerTest @Test void testOnlyValidMessages() { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are consumed") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + + @DisplayName("Delayed messages are consumed as expected") + @ParameterizedTest(name = "delay for message-consumption: {0}") + @ValueSource(ints = { 10, 25, 50, 75, 100 }) + void testOnlyValidMessagesButAllDelayed(int delay) + { + createExampleConsumer(); + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay); + sendValidMessage(0); sendValidMessage(1); sendValidMessage(2); @@ -84,6 +132,8 @@ public abstract class AbstractExampleConsumerTest @Test void testDeserializationException() { + createExampleConsumer(); + sendValidMessage(0); sendValidMessage(1); sendValidMessage(2); @@ -115,6 +165,8 @@ public abstract class AbstractExampleConsumerTest @Test void testUnexpectedDomainError() throws Exception { + createExampleConsumer(); + sendValidMessage(0); sendValidMessage(1); sendValidMessage(2); @@ -143,13 +195,322 @@ public abstract class AbstractExampleConsumerTest .untilAsserted(() -> assertThat(isTerminatedExceptionally.get()).isTrue()); } + @DisplayName("A message, that triggers an non-retryable exception in the domain-logic, is skipped and all other messages are consumed") + @Test + void testNonRetryableDomainError() throws Exception + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersNonRetryableExceptionInDomain(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All other valid messages are consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19)); + } + + @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed") + @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}") + @ValueSource(ints = { 1, 2, 3, 4, 5, 6 }) + void testOneMessageCausesRetryableDomainErrors(int numFailures) + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetryableExceptionInDomain(3,numFailures); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + + @DisplayName("All messages on a partition are delayed and a message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed") + @ParameterizedTest(name = "Delay for normal messages: {0}ms") + @ValueSource(ints = { 10, 20, 50, 100, 150, 200 }) + void testOneMessageCausesRetryableDomainErrorsWhileAllMessagesAreDelayed(int delay) + { + createExampleConsumer(); + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay); + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(100); + + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 1); + sendValidMessage(3); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + + @DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed") + @Test + void testOneMessageCausesRetryableDomainErrors() + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetryableExceptionInDomain(3,66); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19)); + } + + @DisplayName("Several messages are triggering retryable exception in one partition, some of them fail so often, that they are skipped and all successful messages are eventually consumed") + @ParameterizedTest(name = "Delay for errors: {0}ms") + @ValueSource(ints = { 10, 20, 50, 100, 150, 200 }) + void testSeveralMessagesCausesRetryableDomainErrorsInOnePartition(int delay) + { + createExampleConsumer(); + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(delay); + + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 4); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 6); + sendMessageThatTriggersRetryableExceptionInDomain(3, 1); + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 66); + sendMessageThatTriggersRetryableExceptionInDomain(3, 5); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 66); + sendMessageThatTriggersRetryableExceptionInDomain(3, 6); + sendMessageThatTriggersRetryableExceptionInDomain(3, 66); + sendMessageThatTriggersRetryableExceptionInDomain(3, 3); + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 66); + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 1); + sendValidMessage(3); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(16)); + } + + @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed") + @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}") + @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" }) + void testThreeMessagesCauseRetryableDomainErrors( + int numFailuresForMessageA, + int numFailuresForMessageB, + int numFailuresForMessageC) + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetryableExceptionInDomain(3,numFailuresForMessageA); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageB); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageC); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(20)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(30)); + } + + @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried, but one of them fails too often and is skipped, still all other messages are eventually consumed") + @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}") + @CsvSource({ "66,3,4", "4,66,2", "1,2,66" }) + void testThreeMessagesCauseRetryableDomainErrorsAndOneFailsTooOften( + int numFailuresForMessageA, + int numFailuresForMessageB, + int numFailuresForMessageC) + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetryableExceptionInDomain(3,numFailuresForMessageA); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageB); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageC); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(20)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(29)); + } + + @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried 3 times with a fixed back-of and all messages are eventually consumed") + @ParameterizedTest(name = "Back-Off millis: {0}") + @ValueSource(ints = { 100, 250, 500, 1000 }) + void testOneMessageIsRetriedWithFixedBackOff(int backOffMillis) + { + BackOff backOff = new FixedBackOff(backOffMillis, 3); + createExampleConsumer(backOff); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetryableExceptionInDomain(3, 3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + static final String ID = "TEST"; static final String TOPIC = "ExampleConsumerTest_TEST"; static final int NUM_PARTITIONS = 10; + static final int NUM_RETRIES = 6; + static final int POLL_REQUEST_TIMEOUT_MS = 50; + static final int MAX_POLL_INTERVALL_MS = 500; + static final int MAX_TIME_PER_RECORD_MS = 100; + static final int FETCH_MAX_WAIT_MS = 50; + static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100; @Autowired KafkaTemplate kafkaTemplate; + @Autowired ApplicationProperties applicationProperties; + @Autowired KafkaProperties kafkaProperties; + @Autowired Clock clock; final Serializer serializer = createSerializer(); final long[] currentOffsets = new long[NUM_PARTITIONS]; @@ -167,19 +528,38 @@ public abstract class AbstractExampleConsumerTest abstract Consumer createConsumer(KafkaProperties properties); abstract V createValidMessage(); abstract V createMessageThatTriggersRuntimeException(); + abstract V createMessageThatTriggersNonRetryableException(); + abstract V createMessageThatTriggersRetryableException(int numFailures); - @BeforeEach - void createExampleConsumer(@Autowired KafkaProperties properties) + void createExampleConsumer() + { + createExampleConsumer(new FixedBackOff(0l, applicationProperties.getConsumerProperties().getNumRetries())); + } + + void createExampleConsumer(BackOff backOff) { exampleConsumer = new ExampleConsumer( ID, TOPIC, - createConsumer(properties), + createConsumer(kafkaProperties), mockRecordHandler, + clock, + applicationProperties.getConsumerProperties().getPollRequestTimeout(), + Duration.ofMillis(MAX_POLL_INTERVALL_MS), + applicationProperties.getConsumerProperties().getMaxTimePerRecord(), + applicationProperties.getConsumerProperties().getMinSlackPerPollInterval(), + backOff, () -> isTerminatedExceptionally.set(true)); } + @BeforeEach + void resetParameters() + { + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(0); + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(0); + } + @AfterEach void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException { @@ -224,6 +604,16 @@ public abstract class AbstractExampleConsumerTest send(partition, createMessageThatTriggersRuntimeException()); } + private void sendMessageThatTriggersNonRetryableExceptionInDomain(int partition) + { + send(partition, serializer.serialize(TOPIC, createMessageThatTriggersNonRetryableException())); + } + + private void sendMessageThatTriggersRetryableExceptionInDomain(int partition, int numFailures) + { + send(partition, serializer.serialize(TOPIC, createMessageThatTriggersRetryableException(numFailures))); + } + private void send(int partition, V message) { send(partition, serializer.serialize(TOPIC, message)); @@ -253,5 +643,11 @@ public abstract class AbstractExampleConsumerTest properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); return AdminClient.create(properties); } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } } } diff --git a/src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java b/src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java index c8617263..939ee9d0 100644 --- a/src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java +++ b/src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java @@ -3,11 +3,20 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + @RequiredArgsConstructor @Slf4j public abstract class AbstractMockRecordHandler implements RecordHandler { + final Map retriableErrors = new HashMap<>(); + + Duration normalRecordHandlingDelay; + Duration exceptionalRecordHandlingDelay; + private int numMessagesHandled = 0; public int getNumMessagesHandled() @@ -21,17 +30,37 @@ public abstract class AbstractMockRecordHandler implements RecordHandler return VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION; } + @Override + Long createMessageThatTriggersNonRetryableException() + { + return VALUE_THAT_TRIGGERS_NONRETRYABLEEXCEPTION; + } + + @Override + Long createMessageThatTriggersRetryableException(int numFailures) + { + return VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION * numFailures; + } + public final static long VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1; + public final static long VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION = -2; + public final static long VALUE_THAT_TRIGGERS_NONRETRYABLEEXCEPTION = -3; } diff --git a/src/test/java/de/juplo/kafka/LongMockRecordHandler.java b/src/test/java/de/juplo/kafka/LongMockRecordHandler.java index a6a16591..0833116c 100644 --- a/src/test/java/de/juplo/kafka/LongMockRecordHandler.java +++ b/src/test/java/de/juplo/kafka/LongMockRecordHandler.java @@ -2,19 +2,73 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import static de.juplo.kafka.LongExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION; +import static de.juplo.kafka.LongExampleConsumerTest.*; @Slf4j public class LongMockRecordHandler extends AbstractMockRecordHandler { - void generateError(Long value) + @Override + void generateError( + OffsetInPartition offset, + Long value) throws RetryableErrorException, NonRetryableErrorException { + if (value == null || value > -1) + { + // Valid message... + return; + } + if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION) { throw new RuntimeException("Unexpected application error!"); } - log.info("Not specifically mapped error: {}", value); + if (value == VALUE_THAT_TRIGGERS_NONRETRYABLEEXCEPTION) + { + throw new NonRetryableErrorException("Non-Retryable application error!"); + } + + if ((float)value % (float) VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION == 0f) + { + int totalOccurrences = (int) (value / VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION); + int occurrence = retriableErrors.compute( + offset, + (k, v) -> + { + if (v == null) + { + v = totalOccurrences; + } + else + { + v--; + } + + return v; + }); + + if (occurrence <= 0) + { + retriableErrors.remove(offset); + } + else + { + log.debug( + "Simulating occurrence #{} of {} for a retryable error at offset {} in partition {}", + totalOccurrences - occurrence + 1, + totalOccurrences, + offset.offset(), + offset.partition()); + sleep(exceptionalRecordHandlingDelay); + throw new RetryableErrorException("Retryable application error! Occurrence #" + (totalOccurrences - occurrence + 1)); + } + + log.info("Simulating a resolved retryable error after {} occurrences of the error", totalOccurrences); + } + else + { + log.warn("Not specifically mapped error: {}", value); + } } } -- 2.20.1