From d01eca47127638c35a72f430f4a67bd8c289ebb5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 11 Jan 2025 17:10:02 +0100 Subject: [PATCH] =?utf8?q?Version=20des=20`spring-consumer`=20mit=20einer?= =?utf8?q?=20vollst=C3=A4ndigen=20Fehlerbehandlung?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Exceptions & Konfig für wiederholbare und nicht wiederholbare Fehler * RED: Erwartetes Verhalten für die Fehlerbehandlung definiert * GREEN: Erweitertes Error-Handling implementiert * Timout für den Poll-Request konfigurierbar gemacht * Timings für den `ExampleConsumerTest` enger gezogen * Keine Verzögerung für Fehler in den Retry-Tests * Zusätzliche Logging-Meldung für Retry-Ablauf * Nicht ausgewertete Unterscheidung entfernt * Erzeugung des `ExampleConsumer` im Tests über Methode konfigurierbar * GREEN: Erwartetes Verhalten für unterschiedliche Delays definiert * Slack für Poll-Intervall ist explizit konfigurierbar * Test für Retries mit fixem Back-Off in `ExampleConsumerTest` ergänzt * Der Offset eines inaktiven `BackOffState` sollte ein ungültiger Wert sein * Innere Klasse `BackOffState` statisch gemacht * Innere Klasse `BackOffState` extrahiert * Logging in `BackOffState` verbessert * Eindeutigere Methodennamen in `BackOffState` * RED - Unit-Test für `BackOffState` implementiert * GREEN - Fehler in der Initialisierung von `BackOffState` korrigiert * Doppelten Code in `BackOffStateTest` in Methoden ausgelagert * Mocking mit `@Mock` auf Klassenebene erspart Parameter-Schlacht * Aussagelose Tests aus `BackOffStateTest` entfernt * BackOff-Zeit in `BackOffStateTest` in statische Variable ausgelagert * Logging-Meldung des `BackOffState` vereinfacht und verbessert * RED: Korrigiertes Verhalten für `BackOffState` definiert * GREEN: Implementierung von `BackOffState` korrigiert * Umstellung des `ExampleConsumerTest` auf AssertJ * `fetch.max.wait` konfigurierbar gemacht * Test für viele Fehler in einer Partition in `BackOffStateTest` ergänzt * Überprüfung des ``BackOffState``-Zustands berücksichtigt immer den Offset * Bedingungsloser Reset (und besserer Methoden-Name) für `BackOffState` * `BackOffState` wird nur 1x erzeugt und danach zurückgesetzt und gestartet * Überflüssiges Attribut in `BackOffState` entfernt --- README.sh | 2 +- build.gradle | 2 +- docker/docker-compose.yml | 4 +- pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 31 +- .../de/juplo/kafka/ApplicationProperties.java | 15 + .../java/de/juplo/kafka/BackOffState.java | 98 +++++ .../java/de/juplo/kafka/ExampleConsumer.java | 150 ++++++- .../kafka/NonRetriableErrorException.java | 9 + .../java/de/juplo/kafka/RecordHandler.java | 2 +- .../juplo/kafka/RetriableErrorException.java | 9 + src/main/resources/application.yml | 7 + .../java/de/juplo/kafka/BackOffStateTest.java | 304 +++++++++++++ .../de/juplo/kafka/ExampleConsumerTest.java | 413 +++++++++++++++++- .../de/juplo/kafka/MockRecordHandler.java | 93 +++- 15 files changed, 1103 insertions(+), 38 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/BackOffState.java create mode 100644 src/main/java/de/juplo/kafka/NonRetriableErrorException.java create mode 100644 src/main/java/de/juplo/kafka/RetriableErrorException.java create mode 100644 src/test/java/de/juplo/kafka/BackOffStateTest.java diff --git a/README.sh b/README.sh index 392b237e..fcf23227 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-error-handling-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 9df5cdad..3df5a516 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-deserialization-error-SNAPSHOT' +version = '1.1-error-handling-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 56003f70..e23f799e 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -150,14 +150,14 @@ services: juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT + image: juplo/spring-consumer:1.1-error-handling-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: peter juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT + image: juplo/spring-consumer:1.1-error-handling-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: ute diff --git a/pom.xml b/pom.xml index b5cb106e..6bf1dfee 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-deserialization-error-SNAPSHOT + 1.1-error-handling-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index c4174842..eeef2a60 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -10,7 +10,10 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; +import java.time.Clock; import java.util.Properties; @@ -24,6 +27,8 @@ public class ApplicationConfiguration Consumer kafkaConsumer, RecordHandler recordHandler, ApplicationProperties properties, + Clock clock, + BackOff backOffStrategy, ConfigurableApplicationContext applicationContext) { return @@ -32,6 +37,12 @@ public class ApplicationConfiguration properties.getConsumerProperties().getTopic(), kafkaConsumer, recordHandler, + clock, + properties.getConsumerProperties().getPollRequestTimeout(), + properties.getConsumerProperties().getMaxPollInterval(), + properties.getConsumerProperties().getMaxTimePerRecord(), + properties.getConsumerProperties().getMinSlackPerPollInterval(), + backOffStrategy, () -> applicationContext.close()); } @@ -41,6 +52,18 @@ public class ApplicationConfiguration return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value); } + @Bean + public BackOff backOffStrategy(ApplicationProperties properties) + { + return new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()); + } + + @Bean + public Clock clock() + { + return Clock.systemDefaultZone(); + } + @Bean(destroyMethod = "") public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { @@ -52,11 +75,11 @@ public class ApplicationConfiguration { props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name()); } - if (properties.getConsumerProperties().getAutoCommitInterval() != null) - { - props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); - } + props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); props.put("metadata.maxage.ms", 5000); // 5 Sekunden + props.put("max.poll.interval.ms", (int) properties.getConsumer().getMaxPollInterval().toMillis()); + props.put("max.poll.interval.records", properties.getConsumer().getMaxPollRecords()); + props.put("fetch.max.wait.ms", (int)properties.getConsumer().getFetchMaxWait().toMillis()); props.put("partition.assignment.strategy", StickyAssignor.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", LongDeserializer.class.getName()); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c8193c9f..b84c2d21 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -45,7 +45,22 @@ public class ApplicationProperties @NotEmpty private String topic; private OffsetReset autoOffsetReset; + @NotNull private Duration autoCommitInterval; + @NotNull + private Duration pollRequestTimeout; + @NotNull + private Duration maxPollInterval; + @NotNull + private int maxPollRecords; + @NotNull + private Duration fetchMaxWait; + @NotNull + private Duration maxTimePerRecord; + @NotNull + private Duration minSlackPerPollInterval; + @NotNull + private int numRetries; enum OffsetReset { latest, earliest, none } } diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java new file mode 100644 index 00000000..8c6785e3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -0,0 +1,98 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; + + +@RequiredArgsConstructor +@Slf4j +class BackOffState +{ + private final String logPrefix; + private final Clock clock; + private final BackOff backOffStrategy; + + @Getter + private long offset; + private BackOffExecution backOffExecution; + private int numRetries = 0; + private Instant timeNextRetryIsDue; + + + void start(long offset) + { + this.offset = offset; + log.info("{} - Back-Off requested for offset={}", logPrefix, offset); + backOffExecution = backOffStrategy.start(); + initializeNextBackOff(); + } + + boolean isWaitingForNextRetry() + { + if (timeNextRetryIsDue == null) + { + return false; + } + + Instant now = clock.instant(); + Duration remaining = Duration.between(now, timeNextRetryIsDue); + if (remaining.isNegative()) + { + log.info( + "{} - {}. retry for offset={}, lateness: {}", + logPrefix, + numRetries, + offset, + remaining.abs()); + initializeNextBackOff(); + return false; + } + else + { + log.info( + "{} - Next retry for offset={} is due in {}", + logPrefix, + offset, + remaining); + return true; + } + } + + boolean isStarted(long offset) + { + return this.offset == offset && backOffExecution != null; + } + + boolean isCompleted(long offset) + { + return this.offset == offset && timeNextRetryIsDue == null; + } + + void reset() + { + timeNextRetryIsDue = null; + offset = -1l; + } + + private void initializeNextBackOff() + { + long backOffMillis = backOffExecution.nextBackOff(); + + if (backOffMillis == BackOffExecution.STOP) + { + timeNextRetryIsDue = null; + } + else + { + numRetries++; + timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis); + } + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 7e820ea2..47d98b6d 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,23 +2,36 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.util.backoff.BackOff; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.Arrays; +import java.util.Collection; +import java.util.List; @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer implements ConsumerRebalanceListener, Runnable { private final String id; private final String topic; private final Consumer consumer; private final RecordHandler recordHandler; private final Thread workerThread; + private final Clock clock; + private final Duration pollRequestTimeout; + private final Duration maxPollInterval; + private final Duration minTimeForNextRecord; + private final BackOff backOffStrategy; + private final BackOffState[] backOffState; private final Runnable closeCallback; private long consumed = 0; @@ -29,12 +42,27 @@ public class ExampleConsumer implements Runnable String topic, Consumer consumer, RecordHandler recordHandler, + Clock clock, + Duration pollRequestTimeout, + Duration maxPollInterval, + Duration maxTimePerRecord, + Duration minSlackPerPollInterval, + BackOff backOffStrategy, Runnable closeCallback) { this.id = clientId; this.topic = topic; this.consumer = consumer; this.recordHandler = recordHandler; + this.clock = clock; + this.pollRequestTimeout = pollRequestTimeout; + this.maxPollInterval = maxPollInterval; + this.minTimeForNextRecord = maxTimePerRecord.plus(minSlackPerPollInterval); + this.backOffStrategy = backOffStrategy; + + int numPartitions = consumer.partitionsFor(topic).size(); + log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); + this.backOffState = new BackOffState[numPartitions]; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -49,23 +77,106 @@ public class ExampleConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), this); while (true) { try { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + ConsumerRecords records = consumer.poll(pollRequestTimeout); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + + Instant deadline = clock.instant().plus(maxPollInterval); + boolean abortCurrentPoll = false; + + for (TopicPartition topicPartition : records.partitions()) { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); + if (backOffState[topicPartition.partition()].isWaitingForNextRetry()) + { + log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition); + consumer.seek(topicPartition, backOffState[topicPartition.partition()].getOffset()); + continue; + } + + List> recordsForPartition = records.records(topicPartition); + log.debug( + "{} - Received {} messages for partition {}", + id, + recordsForPartition.size(), + topicPartition); + + for (ConsumerRecord record : recordsForPartition) + { + if (abortCurrentPoll) + { + consumer.seek(topicPartition, record.offset()); + break; + } + + Instant now = clock.instant(); + Duration timeLeft = Duration.between(now, deadline); + log.trace("{} - Time left for current poll: {}", id, timeLeft); + + if (timeLeft.minus(minTimeForNextRecord).isNegative()) + { + log.info( + "{} - Aborting record handling, because only {} are left until the poll-interval expires!", + id, + timeLeft); + abortCurrentPoll = true; + consumer.seek(topicPartition, record.offset()); + break; + } + + try + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + catch (RetriableErrorException e) + { + // Seeking to the offset of the record, that raised the exception, and + // leaving the loop afterwards, retries the record + int partition = topicPartition.partition(); + if (!backOffState[partition].isStarted(record.offset())) + { + log.info("{} - First occurrence of a retryable error: {}", id, e.toString()); + backOffState[partition].start(record.offset()); + consumer.seek(topicPartition, record.offset()); + break; + } + else + { + if (backOffState[partition].isCompleted(record.offset())) + { + log.warn("{} - Ignoring retryable error: {}", id, e.toString()); + } + else + { + log.info( + "{} - Retry in progress for offset={} in {}, error: {}", + id, + record.offset(), + partition, + e.toString()); + consumer.seek(topicPartition, record.offset()); + break; + } + } + } + catch (NonRetriableErrorException e) + { + // Just ignore, to skip + log.warn("{} - Ignoring non-retryable error: {}", id, e.toString()); + } + + backOffState[topicPartition.partition()].reset(); + } } } catch (RecordDeserializationException e) @@ -104,7 +215,7 @@ public class ExampleConsumer implements Runnable Integer partition, Long offset, K key, - V value) + V value) throws RetriableErrorException, NonRetriableErrorException { consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); @@ -112,6 +223,23 @@ public class ExampleConsumer implements Runnable } + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(topicPartition -> + backOffState[topicPartition.partition()] = new BackOffState( + id + " - partition=" + topicPartition.partition(), + clock, + backOffStrategy)); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> backOffState[tp.partition()] = null); + } + + public void shutdown() throws InterruptedException { log.info("{} - Waking up the consumer", id); diff --git a/src/main/java/de/juplo/kafka/NonRetriableErrorException.java b/src/main/java/de/juplo/kafka/NonRetriableErrorException.java new file mode 100644 index 00000000..0eb0ff28 --- /dev/null +++ b/src/main/java/de/juplo/kafka/NonRetriableErrorException.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +public class NonRetriableErrorException extends Exception +{ + public NonRetriableErrorException(String message) + { + super(message); + } +} diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java index a7b65af2..5edcf587 100644 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -7,5 +7,5 @@ public interface RecordHandler Integer partition, Long offset, K key, - V value); + V value) throws RetriableErrorException, NonRetriableErrorException; } diff --git a/src/main/java/de/juplo/kafka/RetriableErrorException.java b/src/main/java/de/juplo/kafka/RetriableErrorException.java new file mode 100644 index 00000000..598ddb09 --- /dev/null +++ b/src/main/java/de/juplo/kafka/RetriableErrorException.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +public class RetriableErrorException extends Exception +{ + public RetriableErrorException(String message) + { + super(message); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731c..6528b284 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,6 +6,13 @@ juplo: topic: test auto-offset-reset: earliest auto-commit-interval: 5s + poll-request-timeout: 1s + max-poll-interval: 5m + max-poll-records: 500 + fetch-max-wait: 500ms + max-time-per-record: 30s + min-slack-per-poll-interval: 1s + num-retries: 10 management: endpoint: shutdown: diff --git a/src/test/java/de/juplo/kafka/BackOffStateTest.java b/src/test/java/de/juplo/kafka/BackOffStateTest.java new file mode 100644 index 00000000..f58f545b --- /dev/null +++ b/src/test/java/de/juplo/kafka/BackOffStateTest.java @@ -0,0 +1,304 @@ +package de.juplo.kafka; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; + +import java.time.Clock; +import java.time.Instant; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; + + +@ExtendWith(MockitoExtension.class) +class BackOffStateTest +{ + final static String ID = "TEST"; + final static long OFFSET = 666; + final static Instant NOW = Instant.now(); + final static long BACK_OFF = 1000l; + + + @Mock Clock clock; + @Mock BackOff backOff; + @Mock BackOffExecution backOffExecution; + + + private BackOffState NotStartedBackOffState() + { + // GIVEN + + // WHEN + BackOffState backOffState = new BackOffState(ID, clock, backOff); + + return backOffState; + } + + @Test + @DisplayName("A not started BackOffState is not waiting for a retry") + void NotStartedBackOffStateIsNotWaitingForRetry() + { + // GIVEN + BackOffState backOffState = NotStartedBackOffState(); + + // WHEN + + // THEN + assertThat(backOffState.isWaitingForNextRetry()).isFalse(); + } + + @Test + @DisplayName("A not started BackOffState is not started") + void NotStartedBackOffStateIsNotStarted() + { + // GIVEN + BackOffState backOffState = NotStartedBackOffState(); + + // WHEN + + // THEN + assertThat(backOffState.isStarted(OFFSET)).isFalse(); + } + + + private BackOffState StartedBackoffStateWithNoRetries() + { + // GIVEN + given(backOff.start()).willReturn(backOffExecution); + given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP); + + // WHEN + BackOffState backOffState = new BackOffState(ID, clock, backOff); + backOffState.start(OFFSET); + + return backOffState; + } + + @Test + @DisplayName("A started BackOffState with no retries is not waiting for a retry") + void StartedBackOffStateWithNoRetriesIsNotWaitingForRetry() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithNoRetries(); + + // WHEN + + // THEN + assertThat(backOffState.isWaitingForNextRetry()).isFalse(); + } + + @Test + @DisplayName("A started BackOffState with no retries is started") + void StartedBackOffStateWithNoRetriesIsStarted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithNoRetries(); + + // WHEN + + // THEN + assertThat(backOffState.isStarted(OFFSET)).isTrue(); + } + + @Test + @DisplayName("A started BackOffState with no retries is completed") + void StartedBackOffStateWithNoRetriesIsCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithNoRetries(); + + // WHEN + + // THEN + assertThat(backOffState.isCompleted(OFFSET)).isTrue(); + } + + + private BackOffState StartedBackoffStateWithRetries() + { + // GIVEN + given(clock.instant()).willReturn(NOW); + given(backOff.start()).willReturn(backOffExecution); + given(backOffExecution.nextBackOff()).willReturn(BACK_OFF); + + // WHEN + BackOffState backOffState = new BackOffState(ID, clock, backOff); + backOffState.start(OFFSET); + + return backOffState; + } + + @Test + @DisplayName("A started BackOffState is waiting for a retry if the time is not due") + void StartedBackOffStateIsWaitingForRetryIfTimeIsNotDue() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF)); + + // WHEN + boolean result = backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(result).isTrue(); + } + + @Test + @DisplayName("A started BackOffState is started if the time is not due") + void StartedBackOffStateIsStarted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF)); + + // WHEN + backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(backOffState.isStarted(OFFSET)).isTrue(); + } + + @Test + @DisplayName("A started BackOffState is not completed if the time is not due") + void StartedBackOffStateIsNotCompletedIfTimeIsNotDue() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF)); + + // WHEN + backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(backOffState.isCompleted(OFFSET)).isFalse(); + } + + @Test + @DisplayName("A started BackOffState is not waiting for a retry if the time is due but the retry not yet completed") + void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueButRetryNotCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1)); + given(backOffExecution.nextBackOff()).willReturn(BACK_OFF); + + // WHEN + boolean result = backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(result).isFalse(); + } + + @Test + @DisplayName("A started BackOffState is started if the time is due but the retry not yet completed") + void StartedBackOffStateIsStartedIfTheTimeIsDueButRetryNotCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1)); + given(backOffExecution.nextBackOff()).willReturn(BACK_OFF); + + // WHEN + backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(backOffState.isStarted(OFFSET)).isTrue(); + } + + @Test + @DisplayName("A started BackOffState is not completed if the time is due but the retry not yet completed") + void StartedBackOffStateIsNotCompletedIfTheTimeIsDueButRetryNotCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1)); + given(backOffExecution.nextBackOff()).willReturn(BACK_OFF); + + // WHEN + backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(backOffState.isCompleted(OFFSET)).isFalse(); + } + + @Test + @DisplayName("A started BackOffState is not waiting for a retry if the time is due and the retry is completed") + void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueAndRetryIsCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1)); + given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP); + + // WHEN + boolean result = backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(result).isFalse(); + } + + @Test + @DisplayName("A started BackOffState is started if the time is due and the retry is completed") + void StartedBackOffStateIsStartedIfTheTimeIsDueAndRetryIsCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1)); + given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP); + + // WHEN + backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(backOffState.isStarted(OFFSET)).isTrue(); + } + + @Test + @DisplayName("A started BackOffState is completed if the time is due and the retry is completed") + void StartedBackOffStateIsCompletedIfTheTimeIsDueAndRetryIsCompleted() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + given(clock.instant()).willReturn(NOW.plusMillis(BACK_OFF).plusMillis(1)); + given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP); + + // WHEN + backOffState.isWaitingForNextRetry(); + + // THEN + assertThat(backOffState.isCompleted(OFFSET)).isTrue(); + } + + @Test + @DisplayName("A started BackOffState is not waiting for a retry after a reset") + void StartedBackOffStateIsNotWaitingForRetryAfterReset() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + + // WHEN + backOffState.reset(); + + // THEN + assertThat(backOffState.isWaitingForNextRetry()).isFalse(); + } + + @Test + @DisplayName("A started BackOffState is not started after a reset") + void StartedBackOffStateIsNotStartedAfterReset() + { + // GIVEN + BackOffState backOffState = StartedBackoffStateWithRetries(); + + // WHEN + backOffState.reset(); + + // THEN + assertThat(backOffState.isStarted(OFFSET)).isFalse(); + } +} diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index 590c9cdf..1f9713fa 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -12,6 +12,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; @@ -20,7 +23,10 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; +import java.time.Clock; import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -29,8 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS; -import static de.juplo.kafka.ExampleConsumerTest.TOPIC; +import static de.juplo.kafka.ExampleConsumerTest.*; +import static org.assertj.core.api.Assertions.assertThat; @SpringBootTest( @@ -41,16 +47,24 @@ import static de.juplo.kafka.ExampleConsumerTest.TOPIC; }, properties = { "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms", + "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms", + "juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms", + "juplo.consumer.fetch-max-wait=" + FETCH_MAX_WAIT_MS + "ms", + "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms", + "juplo.consumer.num-retries=" + NUM_RETRIES, "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", "logging.level.de.juplo.kafka=TRACE", }) @EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS) public class ExampleConsumerTest { - @DisplayName("All messages are consumed") + @DisplayName("All messages are consumed as expected") @Test void testOnlyValidMessages() { + createExampleConsumer(); + sendValidMessage(0); sendValidMessage(1); sendValidMessage(2); @@ -75,13 +89,50 @@ public class ExampleConsumerTest Awaitility .await("All messages are consumed") .atMost(Duration.ofSeconds(5)) - .until(() -> mockRecordHandler.getNumMessagesHandled() == 20); + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + + @DisplayName("Delayed messages are consumed as expected") + @ParameterizedTest(name = "delay for message-consumption: {0}") + @ValueSource(ints = { 10, 25, 50, 75, 100 }) + void testOnlyValidMessagesButAllDelayed(int delay) + { + createExampleConsumer(); + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are consumed") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); } @DisplayName("A deserialization exception is skipped and all valid messages are consumed") @Test void testDeserializationException() { + createExampleConsumer(); + sendValidMessage(0); sendValidMessage(1); sendValidMessage(2); @@ -106,13 +157,15 @@ public class ExampleConsumerTest Awaitility .await("All valid messages are consumed") .atMost(Duration.ofSeconds(15)) - .until(() -> mockRecordHandler.getNumMessagesHandled() == 19); + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19)); } @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application") @Test void testUnexpectedDomainError() throws Exception { + createExampleConsumer(); + sendValidMessage(0); sendValidMessage(1); sendValidMessage(2); @@ -138,16 +191,324 @@ public class ExampleConsumerTest .await("The ConsumerRunnable is exited by an unexpected exception") .atMost(Duration.ofSeconds(5)) .pollInterval(Duration.ofMillis(250)) - .until(() -> isTerminatedExceptionally.get()); + .untilAsserted(() -> assertThat(isTerminatedExceptionally.get()).isTrue()); + } + + @DisplayName("A message, that triggers an non-retryable exception in the domain-logic, is skipped and all other messages are consumed") + @Test + void testNonRetryableDomainError() throws Exception + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersNonRetriableExceptionInDomain(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All other valid messages are consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19)); + } + + @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed") + @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}") + @ValueSource(ints = { 1, 2, 3, 4, 5, 6 }) + void testOneMessageCausesRetryableDomainErrors(int numFailures) + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetriableExceptionInDomain(3,numFailures); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + + @DisplayName("All messages on a partition are delayed and a message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed") + @ParameterizedTest(name = "Delay for normal messages: {0}ms") + @ValueSource(ints = { 10, 20, 50, 100, 150, 200 }) + void testOneMessageCausesRetryableDomainErrorsWhileAllMessagesAreDelayed(int delay) + { + createExampleConsumer(); + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay); + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(100); + + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 1); + sendValidMessage(3); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + + @DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed") + @Test + void testOneMessageCausesRetryableDomainErrors() + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetriableExceptionInDomain(3,66); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19)); + } + + @DisplayName("Several messages are triggering retryable exception in one partition, some of them fail so often, that they are skipped and all successful messages are eventually consumed") + @ParameterizedTest(name = "Delay for errors: {0}ms") + @ValueSource(ints = { 10, 20, 50, 100, 150, 200 }) + void testSeveralMessagesCausesRetryableDomainErrorsInOnePartition(int delay) + { + createExampleConsumer(); + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(delay); + + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 4); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 6); + sendMessageThatTriggersRetriableExceptionInDomain(3, 1); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 66); + sendMessageThatTriggersRetriableExceptionInDomain(3, 5); + sendValidMessage(3); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 66); + sendMessageThatTriggersRetriableExceptionInDomain(3, 6); + sendMessageThatTriggersRetriableExceptionInDomain(3, 66); + sendMessageThatTriggersRetriableExceptionInDomain(3, 3); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 66); + sendValidMessage(3); + sendMessageThatTriggersRetriableExceptionInDomain(3, 1); + sendValidMessage(3); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(16)); + } + + @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed") + @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}") + @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" }) + void testThreeMessagesCauseRetryableDomainErrors( + int numFailuresForMessageA, + int numFailuresForMessageB, + int numFailuresForMessageC) + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetriableExceptionInDomain(3,numFailuresForMessageA); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageB); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageC); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(20)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(30)); + } + + @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried, but one of them fails too often and is skipped, still all other messages are eventually consumed") + @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}") + @CsvSource({ "66,3,4", "4,66,2", "1,2,66" }) + void testThreeMessagesCauseRetryableDomainErrorsAndOneFailsTooOften( + int numFailuresForMessageA, + int numFailuresForMessageB, + int numFailuresForMessageC) + { + createExampleConsumer(); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetriableExceptionInDomain(3,numFailuresForMessageA); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageB); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageC); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All other messages are eventually consumed") + .atMost(Duration.ofSeconds(20)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(29)); + } + + @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried 3 times with a fixed back-of and all messages are eventually consumed") + @ParameterizedTest(name = "Back-Off millis: {0}") + @ValueSource(ints = { 100, 250, 500, 1000 }) + void testOneMessageIsRetriedWithFixedBackOff(int backOffMillis) + { + BackOff backOff = new FixedBackOff(backOffMillis, 3); + createExampleConsumer(backOff); + + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRetriableExceptionInDomain(3, 3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are eventually consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); } static final String ID = "TEST"; static final String TOPIC = "ExampleConsumerTest_TEST"; static final int NUM_PARTITIONS = 10; + static final int NUM_RETRIES = 6; + static final int POLL_REQUEST_TIMEOUT_MS = 50; + static final int MAX_POLL_INTERVALL_MS = 500; + static final int MAX_TIME_PER_RECORD_MS = 100; + static final int FETCH_MAX_WAIT_MS = 50; + static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100; @Autowired KafkaTemplate kafkaTemplate; + @Autowired ApplicationProperties properties; + @Autowired Clock clock; final LongSerializer serializer = new LongSerializer(); final long[] currentOffsets = new long[NUM_PARTITIONS]; @@ -160,8 +521,12 @@ public class ExampleConsumerTest ExampleConsumer exampleConsumer; - @BeforeEach - void createExampleConsumer(@Autowired ApplicationProperties properties) + void createExampleConsumer() + { + createExampleConsumer(new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries())); + } + + void createExampleConsumer(BackOff backOff) { ApplicationConfiguration configuration = new ApplicationConfiguration(); Consumer consumer = configuration.kafkaConsumer(properties); @@ -171,9 +536,22 @@ public class ExampleConsumerTest TOPIC, consumer, mockRecordHandler, + clock, + properties.getConsumerProperties().getPollRequestTimeout(), + properties.getConsumerProperties().getMaxPollInterval(), + properties.getConsumerProperties().getMaxTimePerRecord(), + properties.getConsumerProperties().getMinSlackPerPollInterval(), + backOff, () -> isTerminatedExceptionally.set(true)); } + @BeforeEach + void resetParameters() + { + mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(0); + mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(0); + } + @AfterEach void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException { @@ -218,6 +596,16 @@ public class ExampleConsumerTest send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION); } + private void sendMessageThatTriggersNonRetriableExceptionInDomain(int partition) + { + send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION)); + } + + private void sendMessageThatTriggersRetriableExceptionInDomain(int partition, int numFailures) + { + send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION * numFailures)); + } + private void send(int partition, long message) { send(partition, serializer.serialize(TOPIC, message)); @@ -237,6 +625,9 @@ public class ExampleConsumerTest public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1; + public final static int VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION = -2; + public final static int VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION = -3; + @TestConfiguration @@ -249,5 +640,11 @@ public class ExampleConsumerTest properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); return AdminClient.create(properties); } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } } } diff --git a/src/test/java/de/juplo/kafka/MockRecordHandler.java b/src/test/java/de/juplo/kafka/MockRecordHandler.java index 9f540f0e..c294256b 100644 --- a/src/test/java/de/juplo/kafka/MockRecordHandler.java +++ b/src/test/java/de/juplo/kafka/MockRecordHandler.java @@ -1,48 +1,123 @@ package de.juplo.kafka; -import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import static de.juplo.kafka.ExampleConsumerTest.*; -@RequiredArgsConstructor @Slf4j public class MockRecordHandler implements RecordHandler { - @Getter + private final Map retriableErrors = new HashMap<>(); + + Duration normalRecordHandlingDelay; + Duration exceptionalRecordHandlingDelay; + private int numMessagesHandled = 0; + public int getNumMessagesHandled() + { + return numMessagesHandled; + } + @Override public void handleRecord( String topic, Integer partition, Long offset, String key, - Long value) + Long value) throws RetriableErrorException, NonRetriableErrorException { if (value != null && value < 0) { - generateError(value); + generateError(new OffsetInPartition(offset, partition), value.intValue()); } + sleep(normalRecordHandlingDelay); numMessagesHandled++; log.trace("Handled {} messages so far", numMessagesHandled); } - private void generateError(long value) + private void generateError( + OffsetInPartition offset, + int value) throws RetriableErrorException, NonRetriableErrorException { if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION) { throw new RuntimeException("Unexpected application error!"); } - log.info("Not specifically mapped error: {}", value); + if (value == VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION) + { + throw new NonRetriableErrorException("Non-Retryable application error!"); + } + + if ((float)value % (float)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION == 0f) + { + int totalOccurrences = value / VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION; + int occurrence = retriableErrors.compute( + offset, + (k, v) -> + { + if (v == null) + { + v = totalOccurrences; + } + else + { + v--; + } + + return v; + }); + + if (occurrence <= 0) + { + retriableErrors.remove(offset); + } + else + { + log.debug( + "Simulating occurrence #{} of {} for a retryable error at offset {} in partition {}", + totalOccurrences - occurrence + 1, + totalOccurrences, + offset.offset, + offset.partition); + sleep(exceptionalRecordHandlingDelay); + throw new RetriableErrorException("Retryable application error! Occurrence #" + (totalOccurrences - occurrence + 1)); + } + + log.info("Simulating a resolved retryable error after {} occurrences of the error", totalOccurrences); + } + else + { + log.warn("Not specifically mapped error: {}", value); + } + } + + private void sleep(Duration duration) + { + try + { + Thread.sleep(duration); + } + catch(InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } } public void clear() { + retriableErrors.clear(); numMessagesHandled = 0; } + + + private static record OffsetInPartition(long offset, int partition) {} } -- 2.20.1