X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumRebalanceListener.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumRebalanceListener.java;h=be752aec1d29748b9a1f7572f03862218a8aefda;hb=5f35354fd694f78599d66ee9e01fb4c0d89cc5bb;hp=1cd738ff33e4326c78745f14949bb60a46254991;hpb=c808810e9e33afe33b29f7fd3921023ecd15483d;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SumRebalanceListener.java b/src/main/java/de/juplo/kafka/SumRebalanceListener.java index 1cd738f..be752ae 100644 --- a/src/main/java/de/juplo/kafka/SumRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/SumRebalanceListener.java @@ -9,7 +9,6 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.Collection; -import java.util.Map; @RequiredArgsConstructor @@ -34,17 +33,17 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL Integer partition = tp.partition(); Long offset = consumer.position(tp); log.info("{} - adding partition: {}, offset={}", id, partition, offset); - StatisticsDocument document = + StateDocument document = repository .findById(Integer.toString(partition)) - .orElse(new StatisticsDocument(partition)); + .orElse(new StateDocument(partition)); if (document.offset >= 0) { // Only seek, if a stored offset was found // Otherwise: Use initial offset, generated by Kafka consumer.seek(tp, document.offset); } - handler.addPartition(partition, document.statistics); + handler.addPartition(partition, document); }); } @@ -60,8 +59,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL id, partition, newOffset); - Map> removed = handler.removePartition(partition); - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + repository.save(handler.removePartition(partition)); }); } @@ -73,7 +71,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL { log.debug("Storing data and offsets, last commit: {}", lastCommit); handler.getSeen().forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( + new StateDocument( partiton, statistics, consumer.position(new TopicPartition(topic, partiton)))));