X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;h=eef0d00d73e07f8eda6053d3334498f439e6f7d7;hb=5805651c16e07a0710b88c2822894941f67c313e;hp=63d57dfb1bde831013169d45476ff5df946508c0;hpb=627763878b235ba168c7f55a1ef448851b027bfc;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 63d57df..eef0d00 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -3,6 +3,7 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; import java.util.*; @@ -10,7 +11,7 @@ import java.util.*; @RequiredArgsConstructor @Slf4j -public class ApplicationRebalanceListener implements RebalanceListener +public class ApplicationRebalanceListener implements ConsumerRebalanceListener { private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; @@ -20,8 +21,6 @@ public class ApplicationRebalanceListener implements RebalanceListener private final Set partitions = new HashSet<>(); - private boolean commitsEnabled = true; - @Override public void onPartitionsAssigned(Collection partitions) { @@ -51,17 +50,14 @@ public class ApplicationRebalanceListener implements RebalanceListener @Override public void onPartitionsRevoked(Collection partitions) { - if (commitsEnabled) + log.info("{} - Commiting offsets for all previously assigned partitions", id); + try { - log.info("{} - Commiting offsets for all previously assigned partitions", id); - try - { - consumer.commitSync(); - } - catch (Exception e) - { - log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e); - } + consumer.commitSync(); + } + catch (Exception e) + { + log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e); } partitions.forEach(tp -> @@ -83,16 +79,4 @@ public class ApplicationRebalanceListener implements RebalanceListener stateRepository.save(new StateDocument(partition, state, results)); }); } - - @Override - public void enableCommits() - { - commitsEnabled = true; - } - - @Override - public void disableCommits() - { - commitsEnabled = false; - } }