X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumRebalanceListener.java;h=83cb759dbf5076ba45b58074ee5f7330ba5f64ba;hb=3495017ed2116f338c3342a313abdb7170683573;hp=1cd738ff33e4326c78745f14949bb60a46254991;hpb=c808810e9e33afe33b29f7fd3921023ecd15483d;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SumRebalanceListener.java b/src/main/java/de/juplo/kafka/SumRebalanceListener.java index 1cd738f..83cb759 100644 --- a/src/main/java/de/juplo/kafka/SumRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/SumRebalanceListener.java @@ -9,7 +9,6 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.Collection; -import java.util.Map; @RequiredArgsConstructor @@ -34,17 +33,17 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL Integer partition = tp.partition(); Long offset = consumer.position(tp); log.info("{} - adding partition: {}, offset={}", id, partition, offset); - StatisticsDocument document = + StateDocument document = repository .findById(Integer.toString(partition)) - .orElse(new StatisticsDocument(partition)); + .orElse(new StateDocument(partition)); if (document.offset >= 0) { // Only seek, if a stored offset was found // Otherwise: Use initial offset, generated by Kafka consumer.seek(tp, document.offset); } - handler.addPartition(partition, document.statistics); + handler.addPartition(partition, document.state); }); } @@ -60,8 +59,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL id, partition, newOffset); - Map> removed = handler.removePartition(partition); - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + repository.save(new StateDocument(partition, handler.removePartition(partition), newOffset)); }); } @@ -72,10 +70,10 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { log.debug("Storing data and offsets, last commit: {}", lastCommit); - handler.getSeen().forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( + handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save( + new StateDocument( partiton, - statistics, + sumBusinessLogic.getState(), consumer.position(new TopicPartition(topic, partiton))))); lastCommit = clock.instant(); }