From e612ce8a49cdf7b08c3a71981d75feddf238084e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 14 Apr 2025 22:13:57 +0200 Subject: [PATCH] =?utf8?q?Version=20des=20`spring-consumer`=20mit=20einer?= =?utf8?q?=20vollst=C3=A4ndigen=20Fehlerbehandlung?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Exceptions & Konfig für wiederholbare und nicht wiederholbare Fehler * RED: Erwartetes Verhalten für die Fehlerbehandlung definiert * GREEN: Erweitertes Error-Handling implementiert * Timout für den Poll-Request konfigurierbar gemacht * Timings für den `ExampleConsumerTest` enger gezogen * Keine Verzögerung für Fehler in den Retry-Tests * Zusätzliche Logging-Meldung für Retry-Ablauf * Nicht ausgewertete Unterscheidung entfernt * Erzeugung des `ExampleConsumer` im Tests über Methode konfigurierbar * GREEN: Erwartetes Verhalten für unterschiedliche Delays definiert * Slack für Poll-Intervall ist explizit konfigurierbar * Test für Retries mit fixem Back-Off in `ExampleConsumerTest` ergänzt * Der Offset eines inaktiven `BackOffState` sollte ein ungültiger Wert sein * Innere Klasse `BackOffState` statisch gemacht * Innere Klasse `BackOffState` extrahiert * Logging in `BackOffState` verbessert * Eindeutigere Methodennamen in `BackOffState` * RED - Unit-Test für `BackOffState` implementiert * GREEN - Fehler in der Initialisierung von `BackOffState` korrigiert * Doppelten Code in `BackOffStateTest` in Methoden ausgelagert * Mocking mit `@Mock` auf Klassenebene erspart Parameter-Schlacht * Aussagelose Tests aus `BackOffStateTest` entfernt * BackOff-Zeit in `BackOffStateTest` in statische Variable ausgelagert * Logging-Meldung des `BackOffState` vereinfacht und verbessert * RED: Korrigiertes Verhalten für `BackOffState` definiert * GREEN: Implementierung von `BackOffState` korrigiert * Umstellung des `ExampleConsumerTest` auf AssertJ * `fetch.max.wait` konfigurierbar gemacht * Test für viele Fehler in einer Partition in `BackOffStateTest` ergänzt * Überprüfung des ``BackOffState``-Zustands berücksichtigt immer den Offset * Bedingungsloser Reset (und besserer Methoden-Name) für `BackOffState` * `BackOffState` wird nur 1x erzeugt und danach zurückgesetzt und gestartet * Überflüssiges Attribut in `BackOffState` entfernt Neu: * Test für unterschiedliche Nachrichten-Typen vorbereitet. --- .../juplo/kafka/ApplicationConfiguration.java | 35 +- .../de/juplo/kafka/ApplicationProperties.java | 10 + .../java/de/juplo/kafka/BackOffState.java | 98 +++ .../java/de/juplo/kafka/ExampleConsumer.java | 170 ++++- .../kafka/NonRetryableErrorException.java | 9 + .../java/de/juplo/kafka/RecordHandler.java | 2 +- .../juplo/kafka/RetryableErrorException.java | 9 + src/main/resources/application.yml | 11 + .../kafka/AbstractExampleConsumerTest.java | 653 ++++++++++++++++++ .../kafka/AbstractMockRecordHandler.java | 66 ++ .../java/de/juplo/kafka/BackOffStateTest.java | 304 ++++++++ .../juplo/kafka/LongExampleConsumerTest.java | 64 ++ .../de/juplo/kafka/LongMockRecordHandler.java | 74 ++ 13 files changed, 1478 insertions(+), 27 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/BackOffState.java create mode 100644 src/main/java/de/juplo/kafka/NonRetryableErrorException.java create mode 100644 src/main/java/de/juplo/kafka/RetryableErrorException.java create mode 100644 src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java create mode 100644 src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java create mode 100644 src/test/java/de/juplo/kafka/BackOffStateTest.java create mode 100644 src/test/java/de/juplo/kafka/LongExampleConsumerTest.java create mode 100644 src/test/java/de/juplo/kafka/LongMockRecordHandler.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 26f5c54c..07c9bd21 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,14 +2,19 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; import java.time.Clock; +import java.time.Duration; +import java.util.Properties; @Configuration @@ -17,22 +22,38 @@ import java.time.Clock; @Slf4j public class ApplicationConfiguration { + public final static String MAX_POLL_INTERVALL_CONFIG_KEY = "max.poll.interval.ms"; + public final static Duration MAX_POLL_INTERVALL_DEFAULT_VALUE = Duration.ofMinutes(5); @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, RecordHandler recordHandler, - ConsumerHealthIndicatorAwareRebalanceListener consumerHealthIndicatorAwareRebalanceListener, ApplicationProperties properties, KafkaProperties kafkaProperties, + Clock clock, + BackOff backOffStrategy, ConfigurableApplicationContext applicationContext) { + String maxPollIntervalMs = kafkaProperties + .getConsumer() + .getProperties() + .get(MAX_POLL_INTERVALL_CONFIG_KEY); + Duration maxPollInterval = maxPollIntervalMs == null + ? MAX_POLL_INTERVALL_DEFAULT_VALUE + : Duration.ofMillis(Integer.valueOf(maxPollIntervalMs)); + return new ExampleConsumer<>( properties.getClientId(), properties.getConsumerProperties().getTopic(), kafkaConsumer, recordHandler, - consumerHealthIndicatorAwareRebalanceListener, + clock, + properties.getConsumerProperties().getPollRequestTimeout(), + maxPollInterval, + properties.getConsumerProperties().getMaxTimePerRecord(), + properties.getConsumerProperties().getMinSlackPerPollInterval(), + backOffStrategy, () -> applicationContext.close()); } @@ -43,15 +64,9 @@ public class ApplicationConfiguration } @Bean - public ConsumerHealthIndicatorAwareRebalanceListener rebalanceListener(ConsumerHealthIndicator consumerHealthIndicator) - { - return new ConsumerHealthIndicatorAwareRebalanceListener(consumerHealthIndicator); - } - - @Bean - public ConsumerHealthIndicator consumerHealthIndicator(Clock clock) + public BackOff backOffStrategy(ApplicationProperties properties) { - return new ConsumerHealthIndicator(clock); + return new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 22c755e8..1d4a32b5 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -7,6 +7,8 @@ import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; +import java.time.Duration; + @ConfigurationProperties(prefix = "juplo") @Validated @@ -32,5 +34,13 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; + @NotNull + private Duration pollRequestTimeout; + @NotNull + private Duration maxTimePerRecord; + @NotNull + private Duration minSlackPerPollInterval; + @NotNull + private int numRetries; } } diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java new file mode 100644 index 00000000..8c6785e3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -0,0 +1,98 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; + + +@RequiredArgsConstructor +@Slf4j +class BackOffState +{ + private final String logPrefix; + private final Clock clock; + private final BackOff backOffStrategy; + + @Getter + private long offset; + private BackOffExecution backOffExecution; + private int numRetries = 0; + private Instant timeNextRetryIsDue; + + + void start(long offset) + { + this.offset = offset; + log.info("{} - Back-Off requested for offset={}", logPrefix, offset); + backOffExecution = backOffStrategy.start(); + initializeNextBackOff(); + } + + boolean isWaitingForNextRetry() + { + if (timeNextRetryIsDue == null) + { + return false; + } + + Instant now = clock.instant(); + Duration remaining = Duration.between(now, timeNextRetryIsDue); + if (remaining.isNegative()) + { + log.info( + "{} - {}. retry for offset={}, lateness: {}", + logPrefix, + numRetries, + offset, + remaining.abs()); + initializeNextBackOff(); + return false; + } + else + { + log.info( + "{} - Next retry for offset={} is due in {}", + logPrefix, + offset, + remaining); + return true; + } + } + + boolean isStarted(long offset) + { + return this.offset == offset && backOffExecution != null; + } + + boolean isCompleted(long offset) + { + return this.offset == offset && timeNextRetryIsDue == null; + } + + void reset() + { + timeNextRetryIsDue = null; + offset = -1l; + } + + private void initializeNextBackOff() + { + long backOffMillis = backOffExecution.nextBackOff(); + + if (backOffMillis == BackOffExecution.STOP) + { + timeNextRetryIsDue = null; + } + else + { + numRetries++; + timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis); + } + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index bd5cac74..1cd778ec 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -5,21 +5,33 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.util.backoff.BackOff; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.Arrays; +import java.util.Collection; +import java.util.List; @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer implements ConsumerRebalanceListener, Runnable { private final String id; private final String topic; private final Consumer consumer; private final RecordHandler recordHandler; - private final ConsumerRebalanceListener rebalanceListener; private final Thread workerThread; + private final Clock clock; + private final Duration pollRequestTimeout; + private final Duration maxPollInterval; + private final Duration minTimeForNextRecord; + private final BackOff backOffStrategy; + private final BackOffState[] backOffState; private final Runnable closeCallback; private long consumed = 0; @@ -30,14 +42,27 @@ public class ExampleConsumer implements Runnable String topic, Consumer consumer, RecordHandler recordHandler, - ConsumerRebalanceListener rebalanceListener, + Clock clock, + Duration pollRequestTimeout, + Duration maxPollInterval, + Duration maxTimePerRecord, + Duration minSlackPerPollInterval, + BackOff backOffStrategy, Runnable closeCallback) { this.id = clientId; this.topic = topic; this.consumer = consumer; this.recordHandler = recordHandler; - this.rebalanceListener = rebalanceListener; + this.clock = clock; + this.pollRequestTimeout = pollRequestTimeout; + this.maxPollInterval = maxPollInterval; + this.minTimeForNextRecord = maxTimePerRecord.plus(minSlackPerPollInterval); + this.backOffStrategy = backOffStrategy; + + int numPartitions = consumer.partitionsFor(topic).size(); + log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); + this.backOffState = new BackOffState[numPartitions]; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -52,21 +77,117 @@ public class ExampleConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), rebalanceListener); + consumer.subscribe(Arrays.asList(topic), this); while (true) { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + try + { + ConsumerRecords records = consumer.poll(pollRequestTimeout); + + log.info("{} - Received {} messages", id, records.count()); + + Instant deadline = clock.instant().plus(maxPollInterval); + boolean abortCurrentPoll = false; + + for (TopicPartition topicPartition : records.partitions()) + { + if (backOffState[topicPartition.partition()].isWaitingForNextRetry()) + { + log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition); + consumer.seek(topicPartition, backOffState[topicPartition.partition()].getOffset()); + continue; + } + + List> recordsForPartition = records.records(topicPartition); + log.debug( + "{} - Received {} messages for partition {}", + id, + recordsForPartition.size(), + topicPartition); + + for (ConsumerRecord record : recordsForPartition) + { + if (abortCurrentPoll) + { + consumer.seek(topicPartition, record.offset()); + break; + } + + Instant now = clock.instant(); + Duration timeLeft = Duration.between(now, deadline); + log.trace("{} - Time left for current poll: {}", id, timeLeft); + + if (timeLeft.minus(minTimeForNextRecord).isNegative()) + { + log.info( + "{} - Aborting record handling, because only {} are left until the poll-interval expires!", + id, + timeLeft); + abortCurrentPoll = true; + consumer.seek(topicPartition, record.offset()); + break; + } + + try + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + catch (RetryableErrorException e) + { + // Seeking to the offset of the record, that raised the exception, and + // leaving the loop afterwards, retries the record + int partition = topicPartition.partition(); + if (!backOffState[partition].isStarted(record.offset())) + { + log.info("{} - First occurrence of a retryable error: {}", id, e.toString()); + backOffState[partition].start(record.offset()); + consumer.seek(topicPartition, record.offset()); + break; + } + else + { + if (backOffState[partition].isCompleted(record.offset())) + { + log.warn("{} - Ignoring retryable error: {}", id, e.toString()); + } + else + { + log.info( + "{} - Retry in progress for offset={} in {}, error: {}", + id, + record.offset(), + partition, + e.toString()); + consumer.seek(topicPartition, record.offset()); + break; + } + } + } + catch (NonRetryableErrorException e) + { + // Just ignore, to skip + log.warn("{} - Ignoring non-retryable error: {}", id, e.toString()); + } + + backOffState[topicPartition.partition()].reset(); + } + } + } + catch (RecordDeserializationException e) { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); + log.error( + "{} - Ignoring invalid record for offset {} on partition {}: {}", + id, + e.offset(), + e.topicPartition(), + e.getMessage()); + consumer.seek(e.topicPartition(), e.offset() + 1); } } } @@ -94,7 +215,7 @@ public class ExampleConsumer implements Runnable Integer partition, Long offset, K key, - V value) + V value) throws RetryableErrorException, NonRetryableErrorException { consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); @@ -102,6 +223,23 @@ public class ExampleConsumer implements Runnable } + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(topicPartition -> + backOffState[topicPartition.partition()] = new BackOffState( + id + " - partition=" + topicPartition.partition(), + clock, + backOffStrategy)); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> backOffState[tp.partition()] = null); + } + + public void shutdown() throws InterruptedException { log.info("{} - Waking up the consumer", id); diff --git a/src/main/java/de/juplo/kafka/NonRetryableErrorException.java b/src/main/java/de/juplo/kafka/NonRetryableErrorException.java new file mode 100644 index 00000000..89a05c17 --- /dev/null +++ b/src/main/java/de/juplo/kafka/NonRetryableErrorException.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +public class NonRetryableErrorException extends Exception +{ + public NonRetryableErrorException(String message) + { + super(message); + } +} diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java index a7b65af2..fe2bbbc6 100644 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -7,5 +7,5 @@ public interface RecordHandler Integer partition, Long offset, K key, - V value); + V value) throws RetryableErrorException, NonRetryableErrorException; } diff --git a/src/main/java/de/juplo/kafka/RetryableErrorException.java b/src/main/java/de/juplo/kafka/RetryableErrorException.java new file mode 100644 index 00000000..885f15fc --- /dev/null +++ b/src/main/java/de/juplo/kafka/RetryableErrorException.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +public class RetryableErrorException extends Exception +{ + public RetryableErrorException(String message) + { + super(message); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8b37911b..b90c023b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,10 @@ juplo: consumer: topic: test + poll-request-timeout: 1s + max-time-per-record: 30s + min-slack-per-poll-interval: 1s + num-retries: 10 management: endpoint: health: @@ -29,6 +33,13 @@ spring: client-id: DEV consumer: group-id: my-group + value-deserializer: org.apache.kafka.common.serialization.LongDeserializer + auto-offset-reset: earliest + auto-commit-interval: 5s + max-poll-records: 500 + fetch-max-wait: 500ms + properties: + max.poll.interval.ms: 300000 logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java b/src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java new file mode 100644 index 00000000..e726a573 --- /dev/null +++ b/src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java @@ -0,0 +1,653 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; + +import java.time.Clock; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static de.juplo.kafka.AbstractExampleConsumerTest.*; +import static org.assertj.core.api.Assertions.assertThat; + + +@SpringBootTest( + classes = { + KafkaAutoConfiguration.class, + ApplicationProperties.class, + AbstractExampleConsumerTest.ConsumerRunnableTestConfig.class, + }, + properties = { + "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms", + "juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms", + "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms", + "juplo.consumer.num-retries=" + NUM_RETRIES, + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.properties.max.poll.interval.ms=" + MAX_POLL_INTERVALL_MS, + "spring.kafka.consumer.fetch-max-wait=" + FETCH_MAX_WAIT_MS + "ms", + "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", + "logging.level.de.juplo.kafka=TRACE", + }) +@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS) +public abstract class AbstractExampleConsumerTest +{ + @DisplayName("All messages are consumed as expected") + @Test + void testOnlyValidMessages() + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are consumed") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + + @DisplayName("Delayed messages are consumed as expected") + @ParameterizedTest(name = "delay for message-consumption: {0}") + @ValueSource(ints = { 10, 25, 50, 75, 100 }) + void testOnlyValidMessagesButAllDelayed(int delay) + { + createExampleConsumer(); + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are consumed") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + + @DisplayName("A deserialization exception is skipped and all valid messages are consumed") + @Test + void testDeserializationException() + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendNonDeserializableMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All valid messages are consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19)); + } + + @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application") + @Test + void testUnexpectedDomainError() throws Exception + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRuntimeExceptionInDomain(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("The ConsumerRunnable is exited by an unexpected exception") + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(250)) + .untilAsserted(() -> assertThat(isTerminatedExceptionally.get()).isTrue()); + } + + @DisplayName("A message, that triggers an non-retryable exception in the domain-logic, is skipped and all other messages are consumed") + @Test + void testNonRetryableDomainError() throws Exception + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersNonRetryableExceptionInDomain(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All other valid messages are consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19)); + } + + @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed") + @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}") + @ValueSource(ints = { 1, 2, 3, 4, 5, 6 }) + void testOneMessageCausesRetryableDomainErrors(int numFailures) + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetryableExceptionInDomain(3,numFailures); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + + @DisplayName("All messages on a partition are delayed and a message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed") + @ParameterizedTest(name = "Delay for normal messages: {0}ms") + @ValueSource(ints = { 10, 20, 50, 100, 150, 200 }) + void testOneMessageCausesRetryableDomainErrorsWhileAllMessagesAreDelayed(int delay) + { + createExampleConsumer(); + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay); + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(100); + + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 1); + sendValidMessage(3); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + + @DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed") + @Test + void testOneMessageCausesRetryableDomainErrors() + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetryableExceptionInDomain(3,66); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19)); + } + + @DisplayName("Several messages are triggering retryable exception in one partition, some of them fail so often, that they are skipped and all successful messages are eventually consumed") + @ParameterizedTest(name = "Delay for errors: {0}ms") + @ValueSource(ints = { 10, 20, 50, 100, 150, 200 }) + void testSeveralMessagesCausesRetryableDomainErrorsInOnePartition(int delay) + { + createExampleConsumer(); + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(delay); + + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 4); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 6); + sendMessageThatTriggersRetryableExceptionInDomain(3, 1); + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 66); + sendMessageThatTriggersRetryableExceptionInDomain(3, 5); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 66); + sendMessageThatTriggersRetryableExceptionInDomain(3, 6); + sendMessageThatTriggersRetryableExceptionInDomain(3, 66); + sendMessageThatTriggersRetryableExceptionInDomain(3, 3); + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 66); + sendValidMessage(3); + sendMessageThatTriggersRetryableExceptionInDomain(3, 1); + sendValidMessage(3); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(16)); + } + + @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed") + @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}") + @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" }) + void testThreeMessagesCauseRetryableDomainErrors( + int numFailuresForMessageA, + int numFailuresForMessageB, + int numFailuresForMessageC) + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetryableExceptionInDomain(3,numFailuresForMessageA); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageB); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageC); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(20)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(30)); + } + + @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried, but one of them fails too often and is skipped, still all other messages are eventually consumed") + @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}") + @CsvSource({ "66,3,4", "4,66,2", "1,2,66" }) + void testThreeMessagesCauseRetryableDomainErrorsAndOneFailsTooOften( + int numFailuresForMessageA, + int numFailuresForMessageB, + int numFailuresForMessageC) + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetryableExceptionInDomain(3,numFailuresForMessageA); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageB); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetryableExceptionInDomain(6,numFailuresForMessageC); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(20)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(29)); + } + + @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried 3 times with a fixed back-of and all messages are eventually consumed") + @ParameterizedTest(name = "Back-Off millis: {0}") + @ValueSource(ints = { 100, 250, 500, 1000 }) + void testOneMessageIsRetriedWithFixedBackOff(int backOffMillis) + { + BackOff backOff = new FixedBackOff(backOffMillis, 3); + createExampleConsumer(backOff); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetryableExceptionInDomain(3, 3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + + + static final String ID = "TEST"; + static final String TOPIC = "ExampleConsumerTest_TEST"; + static final int NUM_PARTITIONS = 10; + static final int NUM_RETRIES = 6; + static final int POLL_REQUEST_TIMEOUT_MS = 50; + static final int MAX_POLL_INTERVALL_MS = 500; + static final int MAX_TIME_PER_RECORD_MS = 100; + static final int FETCH_MAX_WAIT_MS = 50; + static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100; + + @Autowired + KafkaTemplate kafkaTemplate; + @Autowired ApplicationProperties applicationProperties; + @Autowired KafkaProperties kafkaProperties; + @Autowired Clock clock; + + final Serializer serializer = createSerializer(); + final long[] currentOffsets = new long[NUM_PARTITIONS]; + + long nextMessage = 1; + + final AbstractMockRecordHandler mockRecordHandler = createMockRecordHandler(); + final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean(); + + ExampleConsumer exampleConsumer; + + + abstract Serializer createSerializer(); + abstract AbstractMockRecordHandler createMockRecordHandler(); + abstract Consumer createConsumer(KafkaProperties properties); + abstract V createValidMessage(); + abstract V createMessageThatTriggersRuntimeException(); + abstract V createMessageThatTriggersNonRetryableException(); + abstract V createMessageThatTriggersRetryableException(int numFailures); + + + void createExampleConsumer() + { + createExampleConsumer(new FixedBackOff(0l, applicationProperties.getConsumerProperties().getNumRetries())); + } + + void createExampleConsumer(BackOff backOff) + { + exampleConsumer = new ExampleConsumer( + ID, + TOPIC, + createConsumer(kafkaProperties), + mockRecordHandler, + clock, + applicationProperties.getConsumerProperties().getPollRequestTimeout(), + Duration.ofMillis(MAX_POLL_INTERVALL_MS), + applicationProperties.getConsumerProperties().getMaxTimePerRecord(), + applicationProperties.getConsumerProperties().getMinSlackPerPollInterval(), + backOff, + () -> isTerminatedExceptionally.set(true)); + } + + @BeforeEach + void resetParameters() + { + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(0); + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(0); + } + + @AfterEach + void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException + { + exampleConsumer.shutdown(); + adminClient + .deleteRecords(recordsToDelete()) + .all() + .get(); + mockRecordHandler.clear(); + nextMessage = 1; + isTerminatedExceptionally.set(false); + } + + private Map recordsToDelete() + { + return IntStream + .range(0, NUM_PARTITIONS) + .filter(i -> currentOffsets[i] > 0) + .mapToObj(i -> Integer.valueOf(i)) + .collect(Collectors.toMap( + i -> new TopicPartition(TOPIC, i), + i -> recordsToDelete(i))); + } + + private RecordsToDelete recordsToDelete(int partition) + { + return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1); + } + + private void sendValidMessage(int partition) + { + send(partition, createValidMessage()); + } + + private void sendNonDeserializableMessage(int partition) + { + send(partition, "BOOM!".getBytes()); + } + + private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition) + { + send(partition, createMessageThatTriggersRuntimeException()); + } + + private void sendMessageThatTriggersNonRetryableExceptionInDomain(int partition) + { + send(partition, serializer.serialize(TOPIC, createMessageThatTriggersNonRetryableException())); + } + + private void sendMessageThatTriggersRetryableExceptionInDomain(int partition, int numFailures) + { + send(partition, serializer.serialize(TOPIC, createMessageThatTriggersRetryableException(numFailures))); + } + + private void send(int partition, V message) + { + send(partition, serializer.serialize(TOPIC, message)); + } + + private void send(int partition, byte[] bytes) + { + nextMessage++; + kafkaTemplate + .send(TOPIC, partition, "EGAL", bytes) + .thenAccept(result -> + { + RecordMetadata metadata = result.getRecordMetadata(); + currentOffsets[metadata.partition()] = metadata.offset(); + }); + } + + + + @TestConfiguration + static class ConsumerRunnableTestConfig + { + @Bean + AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker) + { + Map properties = new HashMap<>(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); + return AdminClient.create(properties); + } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + } +} diff --git a/src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java b/src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java new file mode 100644 index 00000000..939ee9d0 --- /dev/null +++ b/src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java @@ -0,0 +1,66 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + + +@RequiredArgsConstructor +@Slf4j +public abstract class AbstractMockRecordHandler implements RecordHandler +{ + final Map retriableErrors = new HashMap<>(); + + Duration normalRecordHandlingDelay; + Duration exceptionalRecordHandlingDelay; + + private int numMessagesHandled = 0; + + public int getNumMessagesHandled() + { + return numMessagesHandled; + } + + @Override + public void handleRecord( + String topic, + Integer partition, + Long offset, + String key, + V value) throws RetryableErrorException, NonRetryableErrorException + { + generateError(new OffsetInPartition(offset, partition), value); + sleep(normalRecordHandlingDelay); + numMessagesHandled++; + log.trace("Handled {} messages so far", numMessagesHandled); + } + + abstract void generateError( + OffsetInPartition offset, + V value) throws RetryableErrorException, NonRetryableErrorException; + + void sleep(Duration duration) + { + try + { + Thread.sleep(duration); + } + catch(InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + public void clear() + { + retriableErrors.clear(); + numMessagesHandled = 0; + } + + + record OffsetInPartition(long offset, int partition) {} +} diff --git a/src/test/java/de/juplo/kafka/BackOffStateTest.java b/src/test/java/de/juplo/kafka/BackOffStateTest.java new file mode 100644 index 00000000..f58f545b --- /dev/null +++ b/src/test/java/de/juplo/kafka/BackOffStateTest.java @@ -0,0 +1,304 @@ +package de.juplo.kafka; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; + +import java.time.Clock; +import java.time.Instant; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; + + +@ExtendWith(MockitoExtension.class) +class BackOffStateTest +{ + final static String ID = "TEST"; + final static long OFFSET = 666; + final static Instant NOW = Instant.now(); + final static long BACK_OFF = 1000l; + + + @Mock Clock clock; + @Mock BackOff backOff; + @Mock BackOffExecution backOffExecution; + + + private BackOffState NotStartedBackOffState() + { + // GIVEN + + // WHEN + BackOffState backOffState = new BackOffState(ID, clock, backOff); + + return backOffState; + } + + @Test + @DisplayName("A not started BackOffState is not waiting for a retry") + void NotStartedBackOffStateIsNotWaitingForRetry() + { + // GIVEN + BackOffState backOffState = NotStartedBackOffState(); + + // WHEN + + // THEN + assertThat(backOffState.isWaitingForNextRetry()).isFalse(); + } + + @Test + @DisplayName("A not started BackOffState is not started") + void NotStartedBackOffStateIsNotStarted() + { + // GIVEN + BackOffState backOffState = NotStartedBackOffState(); + + // WHEN + + // THEN + assertThat(backOffState.isStarted(OFFSET)).isFalse(); + } + + + private BackOffState StartedBackoffStateWithNoRetries() + { + // GIVEN + given(backOff.start()).willReturn(backOffExecution); + given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP); + + // WHEN + BackOffState backOffState = new BackOffState(ID, clock, backOff); + backOffState.start(OFFSET); + + return backOffState; + } + + @Test + @DisplayName("A started BackOffState with no retries is not waiting for a retry") + void StartedBackOffStateWithNoRetriesIsNotWaitingForRetry() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithNoRetries(); + + // WHEN + + // THEN + assertThat(backOffState.isWaitingForNextRetry()).isFalse(); + } + + @Test + @DisplayName("A started BackOffState with no retries is started") + void StartedBackOffStateWithNoRetriesIsStarted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithNoRetries(); + + // WHEN + + // THEN + assertThat(backOffState.isStarted(OFFSET)).isTrue(); + } + + @Test + @DisplayName("A started BackOffState with no retries is completed") + void StartedBackOffStateWithNoRetriesIsCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithNoRetries(); + + // WHEN + + // THEN + assertThat(backOffState.isCompleted(OFFSET)).isTrue(); + } + + + private BackOffState StartedBackoffStateWithRetries() + { + // GIVEN + given(clock.instant()).willReturn(NOW); + given(backOff.start()).willReturn(backOffExecution); + given(backOffExecution.nextBackOff()).willReturn(BACK_OFF); + + // WHEN + BackOffState backOffState = new BackOffState(ID, clock, backOff); + backOffState.start(OFFSET); + + return backOffState; + } + + @Test + @DisplayName("A started BackOffState is waiting for a retry if the time is not due") + void StartedBackOffStateIsWaitingForRetryIfTimeIsNotDue() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF)); + + // WHEN + boolean result = backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(result).isTrue(); + } + + @Test + @DisplayName("A started BackOffState is started if the time is not due") + void StartedBackOffStateIsStarted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF)); + + // WHEN + backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(backOffState.isStarted(OFFSET)).isTrue(); + } + + @Test + @DisplayName("A started BackOffState is not completed if the time is not due") + void StartedBackOffStateIsNotCompletedIfTimeIsNotDue() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF)); + + // WHEN + backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(backOffState.isCompleted(OFFSET)).isFalse(); + } + + @Test + @DisplayName("A started BackOffState is not waiting for a retry if the time is due but the retry not yet completed") + void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueButRetryNotCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1)); + given(backOffExecution.nextBackOff()).willReturn(BACK_OFF); + + // WHEN + boolean result = backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(result).isFalse(); + } + + @Test + @DisplayName("A started BackOffState is started if the time is due but the retry not yet completed") + void StartedBackOffStateIsStartedIfTheTimeIsDueButRetryNotCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1)); + given(backOffExecution.nextBackOff()).willReturn(BACK_OFF); + + // WHEN + backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(backOffState.isStarted(OFFSET)).isTrue(); + } + + @Test + @DisplayName("A started BackOffState is not completed if the time is due but the retry not yet completed") + void StartedBackOffStateIsNotCompletedIfTheTimeIsDueButRetryNotCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1)); + given(backOffExecution.nextBackOff()).willReturn(BACK_OFF); + + // WHEN + backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(backOffState.isCompleted(OFFSET)).isFalse(); + } + + @Test + @DisplayName("A started BackOffState is not waiting for a retry if the time is due and the retry is completed") + void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueAndRetryIsCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1)); + given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP); + + // WHEN + boolean result = backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(result).isFalse(); + } + + @Test + @DisplayName("A started BackOffState is started if the time is due and the retry is completed") + void StartedBackOffStateIsStartedIfTheTimeIsDueAndRetryIsCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1)); + given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP); + + // WHEN + backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(backOffState.isStarted(OFFSET)).isTrue(); + } + + @Test + @DisplayName("A started BackOffState is completed if the time is due and the retry is completed") + void StartedBackOffStateIsCompletedIfTheTimeIsDueAndRetryIsCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1)); + given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP); + + // WHEN + backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(backOffState.isCompleted(OFFSET)).isTrue(); + } + + @Test + @DisplayName("A started BackOffState is not waiting for a retry after a reset") + void StartedBackOffStateIsNotWaitingForRetryAfterReset() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + + // WHEN + backOffState.reset(); + + // THEN + assertThat(backOffState.isWaitingForNextRetry()).isFalse(); + } + + @Test + @DisplayName("A started BackOffState is not started after a reset") + void StartedBackOffStateIsNotStartedAfterReset() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + + // WHEN + backOffState.reset(); + + // THEN + assertThat(backOffState.isStarted(OFFSET)).isFalse(); + } +} diff --git a/src/test/java/de/juplo/kafka/LongExampleConsumerTest.java b/src/test/java/de/juplo/kafka/LongExampleConsumerTest.java new file mode 100644 index 00000000..f62ba56b --- /dev/null +++ b/src/test/java/de/juplo/kafka/LongExampleConsumerTest.java @@ -0,0 +1,64 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +import java.util.Map; + + +public class LongExampleConsumerTest extends AbstractExampleConsumerTest +{ + @Override + AbstractMockRecordHandler createMockRecordHandler() + { + return new LongMockRecordHandler(); + } + + @Override + Serializer createSerializer() + { + return new LongSerializer(); + } + + @Override + Consumer createConsumer(KafkaProperties kafkaProperties) + { + Map properties = kafkaProperties.buildConsumerProperties(); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); + return new DefaultKafkaConsumerFactory<>(properties).createConsumer(); + } + + @Override + Long createValidMessage() + { + return nextMessage; + } + + @Override + Long createMessageThatTriggersRuntimeException() + { + return VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION; + } + + @Override + Long createMessageThatTriggersNonRetryableException() + { + return VALUE_THAT_TRIGGERS_NONRETRYABLEEXCEPTION; + } + + @Override + Long createMessageThatTriggersRetryableException(int numFailures) + { + return VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION * numFailures; + } + + + public final static long VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1; + public final static long VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION = -2; + public final static long VALUE_THAT_TRIGGERS_NONRETRYABLEEXCEPTION = -3; +} diff --git a/src/test/java/de/juplo/kafka/LongMockRecordHandler.java b/src/test/java/de/juplo/kafka/LongMockRecordHandler.java new file mode 100644 index 00000000..0833116c --- /dev/null +++ b/src/test/java/de/juplo/kafka/LongMockRecordHandler.java @@ -0,0 +1,74 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; + +import static de.juplo.kafka.LongExampleConsumerTest.*; + + +@Slf4j +public class LongMockRecordHandler extends AbstractMockRecordHandler +{ + @Override + void generateError( + OffsetInPartition offset, + Long value) throws RetryableErrorException, NonRetryableErrorException + { + if (value == null || value > -1) + { + // Valid message... + return; + } + + if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION) + { + throw new RuntimeException("Unexpected application error!"); + } + + if (value == VALUE_THAT_TRIGGERS_NONRETRYABLEEXCEPTION) + { + throw new NonRetryableErrorException("Non-Retryable application error!"); + } + + if ((float)value % (float) VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION == 0f) + { + int totalOccurrences = (int) (value / VALUE_THAT_TRIGGERS_RETRYABLEEXCEPTION); + int occurrence = retriableErrors.compute( + offset, + (k, v) -> + { + if (v == null) + { + v = totalOccurrences; + } + else + { + v--; + } + + return v; + }); + + if (occurrence <= 0) + { + retriableErrors.remove(offset); + } + else + { + log.debug( + "Simulating occurrence #{} of {} for a retryable error at offset {} in partition {}", + totalOccurrences - occurrence + 1, + totalOccurrences, + offset.offset(), + offset.partition()); + sleep(exceptionalRecordHandlingDelay); + throw new RetryableErrorException("Retryable application error! Occurrence #" + (totalOccurrences - occurrence + 1)); + } + + log.info("Simulating a resolved retryable error after {} occurrences of the error", totalOccurrences); + } + else + { + log.warn("Not specifically mapped error: {}", value); + } + } +} -- 2.39.5