X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;h=444b7b7139d2d6845a02e42fb109e8b77f871b37;hb=f1eec82fb197f9fc7906eb9a90d75468e9e4356f;hp=59b420a9492d11ae1ee91c6126eb9c576492fd9e;hpb=824f55bc813acebd4ea67ccf7f22518a31351076;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 59b420a..444b7b7 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -25,6 +25,7 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe private final Consumer consumer; private Instant lastCommit = Instant.EPOCH; + private boolean commitsEnabled = true; @Override public void onPartitionsAssigned(Collection partitions) @@ -59,8 +60,15 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe id, partition, offset); - Map removed = recordHandler.removePartition(partition); - stateRepository.save(new StateDocument(partition, removed, offset)); + if (commitsEnabled) + { + Map removed = recordHandler.removePartition(partition); + stateRepository.save(new StateDocument(partition, removed, offset)); + } + else + { + log.info("Offset commits are disabled! Last commit: {}", lastCommit); + } }); } @@ -68,6 +76,12 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe @Override public void beforeNextPoll() { + if (!commitsEnabled) + { + log.info("Offset commits are disabled! Last commit: {}", lastCommit); + return; + } + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { log.debug("Storing data and offsets, last commit: {}", lastCommit); @@ -79,4 +93,16 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe lastCommit = clock.instant(); } } + + @Override + public void enableCommits() + { + commitsEnabled = true; + } + + @Override + public void disableCommits() + { + commitsEnabled = false; + } }