X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;h=f3e6c8af018299efc9e3e027b45a14f76b29c5a1;hb=ce92719ebd18b3c34a3aa8ca60cf67f6c3fbd8b2;hp=b517d3574efc3b2c4b0c022c9f78a60e7084fe44;hpb=a4bbf334910cb37fabadb720cd0ee7f2a21ecaa7;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index b517d35..f3e6c8a 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -13,7 +13,7 @@ import java.util.*; @RequiredArgsConstructor @Slf4j -public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener +public class ApplicationRebalanceListener implements RebalanceListener { private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; @@ -74,15 +74,15 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe Integer partition = tp.partition(); log.info("{} - removing partition: {}", id, partition); this.partitions.remove(partition); - Long offset = consumer.position(tp); if (commitsEnabled) { + Map state = recordHandler.removePartition(partition); + Long offset = consumer.position(tp); log.info( - "{} - Storing {} as offset of next message for partition {}", + "{} - offset of next unseen message for partition {} is {}", id, - offset, - partition); - Map state = recordHandler.removePartition(partition); + partition, + offset); for (String user : state.keySet()) { log.info( @@ -97,7 +97,7 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe } else { - log.info("Offset commits are disabled! Last commit: {}", lastCommit); + log.info("{} - Offset commits are disabled! Last commit: {}", id, lastCommit); } }); } @@ -108,19 +108,37 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe { if (!commitsEnabled) { - log.info("Offset commits are disabled! Last commit: {}", lastCommit); + log.info("{} - Offset commits are disabled! Last commit: {}", id, lastCommit); return; } if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { - log.debug("Storing data and offsets, last commit: {}", lastCommit); - partitions.forEach(partition -> stateRepository.save( - new StateDocument( + partitions + .stream() + .forEach(partition -> + { + log.info("{} - persisting state & offset for partition: {}", id, partition); + Map state = recordHandler.getState(partition).getState(); + Long offset = consumer.position(new TopicPartition(topic, partition)); + log.info( + "{} - offset of next unseen message for partition {} is {}", + id, + partition, + offset); + for (String user : state.keySet()) + { + log.info( + "{} - Saved state for partition={}|user={}: {}", + id, partition, - recordHandler.getState(partition).getState(), - adderResults.getState(partition), - consumer.position(new TopicPartition(topic, partition))))); + user, + state.get(user)); + } + Map> results = adderResults.getState(partition); + stateRepository.save(new StateDocument(partition, state, results, offset)); + }); + lastCommit = clock.instant(); } }