From d8ec8ee7ea93e801051ce3cd6f83db2aa20e6b95 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Sep 2022 19:19:15 +0200 Subject: [PATCH] Anwendung auf den Default-ErrorHandler umgestellt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Tests mussten entsprechend angepasst werden, da die Methode `EndlessConsumer.exitStatus()` aufgrund der Umstellung nicht mehr verfügbar ist. * Die Logik der Tests wurde aber nicht geändert. * Die Tests zeigen nur, dass die Anwendung sich nicht wie zuvor beendet. * Durch manuelle Versuchen wurden folgende Erkenntnisse gewonnen: ** Im Fall eines Deserialisierungs-Fehlers begibt sich die Anwendung in eine Endlosschleife. ** Da, in der Fehlersituation keine Commits durchgeführt werden, hängt die Anwendung dann auch nach einem Neustart weiter in der Fehlerschleife. ** Im Fall eines Logik-Fehlers Startet ein Back-Off mit 10 Versuchen. ** Dabei werden nach jedem Fehler die Offsets für alle Partitionen für die der letzte `poll()` Nachrichten geliefert hatte, die noch nicht verarbeitet wurden, auf die nächste unverarbeitete Nachricht zurück gesetzt und anchließend wird `poll()` neu ausgeführt. ** Nach dem letzten Versuch springt die Anwendung über den Fehler hinweg. --- .../juplo/kafka/ApplicationConfiguration.java | 8 -- .../juplo/kafka/ApplicationErrorHandler.java | 94 ------------------- .../kafka/ApplicationHealthIndicator.java | 15 +-- .../java/de/juplo/kafka/EndlessConsumer.java | 10 -- .../juplo/kafka/GenericApplicationTests.java | 32 +++---- 5 files changed, 16 insertions(+), 143 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/ApplicationErrorHandler.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 1755747..c09eec3 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -46,16 +46,9 @@ public class ApplicationConfiguration kafkaProperties.getClientId()); } - @Bean - public ApplicationErrorHandler applicationErrorHandler() - { - return new ApplicationErrorHandler(); - } - @Bean public EndlessConsumer endlessConsumer( RecordHandler recordHandler, - ApplicationErrorHandler errorHandler, KafkaProperties kafkaProperties, KafkaListenerEndpointRegistry endpointRegistry) { @@ -63,7 +56,6 @@ public class ApplicationConfiguration new EndlessConsumer( kafkaProperties.getClientId(), endpointRegistry, - errorHandler, recordHandler); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java deleted file mode 100644 index 52c6a0c..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java +++ /dev/null @@ -1,94 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.TopicPartition; -import org.springframework.kafka.listener.CommonErrorHandler; -import org.springframework.kafka.listener.MessageListenerContainer; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - - -@Slf4j -public class ApplicationErrorHandler implements CommonErrorHandler -{ - private Exception exception; - private boolean ack = true; - - @Override - public boolean remainingRecords() - { - return true; - } - - @Override - public void handleOtherException( - Exception thrownException, - Consumer consumer, - MessageListenerContainer container, - boolean batchListener) - { - rememberExceptionAndStopContainer(thrownException, container); - } - - @Override - public void handleRemaining( - Exception thrownException, - List> records, - Consumer consumer, - MessageListenerContainer container) - { - Map offsets = new HashMap<>(); - records.forEach(record -> - offsets.computeIfAbsent( - new TopicPartition(record.topic(), record.partition()), - offset -> record.offset())); - offsets.forEach((tp, offset) -> consumer.seek(tp, offset)); - rememberExceptionAndStopContainer(thrownException, container); - } - - @Override - public void handleBatch( - Exception thrownException, - ConsumerRecords data, - Consumer consumer, - MessageListenerContainer container, - Runnable invokeListener) - { - // Do not commit the polled offsets on a logic-error - ack = false; - rememberExceptionAndStopContainer(thrownException, container); - } - - private void rememberExceptionAndStopContainer( - Exception exception, - MessageListenerContainer container) - { - log.error("{}, stopping container {} abnormally", exception, container); - this.exception = exception; - container.stopAbnormally(() -> log.info("{} is stopped", container)); - } - - @Override - public boolean isAckAfterHandle() - { - return ack; - } - - - public Optional getException() - { - return Optional.ofNullable(exception); - } - - public void clearState() - { - this.exception = null; - this.ack = true; - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index ab9782c..e215c69 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -16,17 +16,8 @@ public class ApplicationHealthIndicator implements HealthIndicator @Override public Health health() { - try - { - return consumer - .exitStatus() - .map(Health::down) - .orElse(Health.outOfService()) - .build(); - } - catch (IllegalStateException e) - { - return Health.up().build(); - } + return consumer.running() + ? Health.up().build() + : Health.down().build(); } } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 655151a..27c1e44 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,6 @@ public class EndlessConsumer { private final String id; private final KafkaListenerEndpointRegistry registry; - private final ApplicationErrorHandler errorHandler; private final RecordHandler recordHandler; private long consumed = 0; @@ -83,7 +82,6 @@ public class EndlessConsumer throw new IllegalStateException("Consumer instance " + id + " is already running!"); log.info("{} - Starting - consumed {} messages before", id, consumed); - errorHandler.clearState(); registry.getListenerContainer(id).start(); } @@ -101,12 +99,4 @@ public class EndlessConsumer { return registry.getListenerContainer(id).isRunning(); } - - public Optional exitStatus() - { - if (running()) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - - return errorHandler.getException(); - } } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 49ddb47..66a80ad 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -109,9 +109,9 @@ abstract class GenericApplicationTests assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); }); - assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> endlessConsumer.exitStatus()) - .describedAs("Consumer should still be running"); + assertThat(endlessConsumer.running()) + .describedAs("Consumer should still be running") + .isTrue(); endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); @@ -144,12 +144,9 @@ abstract class GenericApplicationTests .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); + assertThat(endlessConsumer.running()) + .describedAs("Consumer should have exited") + .isFalse(); recordGenerator.assertBusinessLogic(); } @@ -177,12 +174,9 @@ abstract class GenericApplicationTests assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThatNoException() + assertThat(endlessConsumer.running()) .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RuntimeException.class); + .isFalse(); recordGenerator.assertBusinessLogic(); } @@ -392,10 +386,10 @@ abstract class GenericApplicationTests return new TestRecordHandler(applicationRecordHandler); } - @Bean(destroyMethod = "close") - public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) - { - return factory.createConsumer(); - } + @Bean(destroyMethod = "close") + public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) + { + return factory.createConsumer(); + } } } -- 2.20.1