From 384ed293b9d6d71f68031f92bd7bf60670a7ba4b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 26 Aug 2022 13:52:48 +0200 Subject: [PATCH] Fehler im Commit-Verhalten korrigiert: Bei Logik-Fehler, kein Commit MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Implementierung sieht vor, dass bei einer unerwarteten Exception (i.d.R. ein Fehler in der Fachlogik) kein Commit durchgeführt wird. * Ansonsten müsste in der Situation ein expliziter Seek der Offstes auf die Positionen der vor dem Auftreten des Fehlers verarbeiteten Nachrichten durchgeführt werden, damit es nicht zu einem Verlust von Nachrichten kommt. * Dieses Verhalten wurde durch die Verlagerung des Commits in den Rebalance-Listener unterwandert, da der Commit dort auch im Falle einer unerwarteten Exception durchgeführt wurde. * Als Korrektur wurde hier eine Methode eingeführt, über die der Commit im Rebalance in dieser Situation unterdrückt werden kann. --- .../kafka/ApplicationRebalanceListener.java | 34 ++++++++++++++----- .../CommittingConsumerRebalanceListener.java | 10 ++++++ .../java/de/juplo/kafka/EndlessConsumer.java | 6 ++-- .../juplo/kafka/GenericApplicationTests.java | 5 ++- 4 files changed, 41 insertions(+), 14 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index eef0d00..d319295 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -3,7 +3,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; import java.util.*; @@ -11,7 +10,7 @@ import java.util.*; @RequiredArgsConstructor @Slf4j -public class ApplicationRebalanceListener implements ConsumerRebalanceListener +public class ApplicationRebalanceListener implements CommittingConsumerRebalanceListener { private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; @@ -21,6 +20,8 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener private final Set partitions = new HashSet<>(); + private boolean commitsEnabled = true; + @Override public void onPartitionsAssigned(Collection partitions) { @@ -50,14 +51,17 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener @Override public void onPartitionsRevoked(Collection partitions) { - log.info("{} - Commiting offsets for all previously assigned partitions", id); - try - { - consumer.commitSync(); - } - catch (Exception e) + if (commitsEnabled) { - log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e); + log.info("{} - Commiting offsets for all previously assigned partitions", id); + try + { + consumer.commitSync(); + } + catch (Exception e) + { + log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e); + } } partitions.forEach(tp -> @@ -79,4 +83,16 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener stateRepository.save(new StateDocument(partition, state, results)); }); } + + @Override + public void enableCommits() + { + commitsEnabled = true; + } + + @Override + public void disableCommits() + { + commitsEnabled = false; + } } diff --git a/src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java new file mode 100644 index 0000000..8aa92c0 --- /dev/null +++ b/src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java @@ -0,0 +1,10 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + + +public interface CommittingConsumerRebalanceListener extends ConsumerRebalanceListener +{ + void enableCommits(); + void disableCommits(); +} diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index f0e74d3..63fc10f 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final ConsumerRebalanceListener rebalanceListener; + private final CommittingConsumerRebalanceListener rebalanceListener; private final RecordHandler recordHandler; private final Lock lock = new ReentrantLock(); @@ -42,6 +42,7 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); + rebalanceListener.enableCommits(); consumer.subscribe(Arrays.asList(topic), rebalanceListener); while (true) @@ -89,7 +90,8 @@ public class EndlessConsumer implements Runnable } catch(Exception e) { - log.error("{} - Unexpected error: {}", id, e.toString(), e); + log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e); + rebalanceListener.disableCommits(); shutdown(e); } finally diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 595ef89..7335770 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -67,7 +66,7 @@ abstract class GenericApplicationTests @Autowired MongoProperties mongoProperties; @Autowired - ConsumerRebalanceListener rebalanceListener; + CommittingConsumerRebalanceListener rebalanceListener; @Autowired RecordHandler recordHandler; @@ -213,7 +212,7 @@ abstract class GenericApplicationTests Long expected = offsetsToCheck.get(tp) + 1; log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") + .describedAs("Committed offset must be at most equal to the offset of the consumer") .isLessThanOrEqualTo(expected); isOffsetBehindSeen.add(offset < expected); }); -- 2.20.1