X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;h=444b7b7139d2d6845a02e42fb109e8b77f871b37;hb=f1eec82fb197f9fc7906eb9a90d75468e9e4356f;hp=542af2d563a42466f971546f01e7f38eb9a5e1ed;hpb=a2e8fc924e5b472d6b90c42d311514f91ea452f1;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 542af2d..444b7b7 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -22,7 +22,7 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe private final String topic; private final Clock clock; private final Duration commitInterval; - private final Consumer consumer; + private final Consumer consumer; private Instant lastCommit = Instant.EPOCH; private boolean commitsEnabled = true; @@ -85,10 +85,10 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { log.debug("Storing data and offsets, last commit: {}", lastCommit); - recordHandler.getState().forEach((partiton, adder) -> stateRepository.save( + recordHandler.getState().forEach((partiton, state) -> stateRepository.save( new StateDocument( partiton, - adder.getState(), + state, consumer.position(new TopicPartition(topic, partiton))))); lastCommit = clock.instant(); }