X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumRebalanceListener.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumRebalanceListener.java;h=83cb759dbf5076ba45b58074ee5f7330ba5f64ba;hb=3495017ed2116f338c3342a313abdb7170683573;hp=be752aec1d29748b9a1f7572f03862218a8aefda;hpb=5f35354fd694f78599d66ee9e01fb4c0d89cc5bb;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SumRebalanceListener.java b/src/main/java/de/juplo/kafka/SumRebalanceListener.java index be752ae..83cb759 100644 --- a/src/main/java/de/juplo/kafka/SumRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/SumRebalanceListener.java @@ -43,7 +43,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL // Otherwise: Use initial offset, generated by Kafka consumer.seek(tp, document.offset); } - handler.addPartition(partition, document); + handler.addPartition(partition, document.state); }); } @@ -59,7 +59,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL id, partition, newOffset); - repository.save(handler.removePartition(partition)); + repository.save(new StateDocument(partition, handler.removePartition(partition), newOffset)); }); } @@ -70,10 +70,10 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { log.debug("Storing data and offsets, last commit: {}", lastCommit); - handler.getSeen().forEach((partiton, statistics) -> repository.save( + handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save( new StateDocument( partiton, - statistics, + sumBusinessLogic.getState(), consumer.position(new TopicPartition(topic, partiton))))); lastCommit = clock.instant(); }