X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FWordcountRebalanceListener.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FWordcountRebalanceListener.java;h=9f2fc0f75dc193dfb72438f01a970d857e63875b;hb=9511a89368c96d0b5f09d55adaaed5515c578dcc;hp=9a69c8f034af2093764e0024ed9d0f5f6cc6937d;hpb=fc682d9890787ef363b3e189f6f880a043f3c541;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java index 9a69c8f..9f2fc0f 100644 --- a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java @@ -3,22 +3,28 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.Collection; import java.util.Map; @RequiredArgsConstructor @Slf4j -public class WordcountRebalanceListener implements ConsumerRebalanceListener +public class WordcountRebalanceListener implements PollIntervalAwareConsumerRebalanceListener { private final WordcountRecordHandler handler; private final PartitionStatisticsRepository repository; private final String id; + private final String topic; + private final Clock clock; + private final Duration commitInterval; private final Consumer consumer; + private Instant lastCommit = Instant.EPOCH; @Override public void onPartitionsAssigned(Collection partitions) @@ -58,4 +64,20 @@ public class WordcountRebalanceListener implements ConsumerRebalanceListener repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); }); } + + + @Override + public void beforeNextPoll() + { + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { + log.debug("Storing data and offsets, last commit: {}", lastCommit); + handler.getSeen().forEach((partiton, statistics) -> repository.save( + new StatisticsDocument( + partiton, + statistics, + consumer.position(new TopicPartition(topic, partiton))))); + lastCommit = clock.instant(); + } + } }