X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;h=8e8464fbb09237f56257a69922c90c39f8abe09f;hb=348477f6e4ff77e78bf7b6db66e4716663c9512d;hp=109b205b850dcf6e6c2d27b08c3169b5af74b3a8;hpb=6bfbab41a28c8fb6252d328954f5b7129b24ec25;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 109b205..8e8464f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -13,7 +13,7 @@ import java.util.*; @RequiredArgsConstructor @Slf4j -public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener +public class ApplicationRebalanceListener implements RebalanceListener { private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; @@ -22,7 +22,7 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe private final String topic; private final Clock clock; private final Duration commitInterval; - private final Consumer consumer; + private final Consumer consumer; private final Set partitions = new HashSet<>(); @@ -97,7 +97,7 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe } else { - log.info("Offset commits are disabled! Last commit: {}", lastCommit); + log.info("{} - Offset commits are disabled! Last commit: {}", id, lastCommit); } }); } @@ -108,13 +108,13 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe { if (!commitsEnabled) { - log.info("Offset commits are disabled! Last commit: {}", lastCommit); + log.info("{} - Offset commits are disabled! Last commit: {}", id, lastCommit); return; } if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { - log.debug("Storing data and offsets, last commit: {}", lastCommit); + log.debug("{} - Storing data and offsets, last commit: {}", id, lastCommit); partitions.forEach(partition -> stateRepository.save( new StateDocument( partition,