import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
import java.util.Collection;
import java.util.Map;
@RequiredArgsConstructor
@Slf4j
-public class WordcountRebalanceListener implements ConsumerRebalanceListener
+public class WordcountRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
{
private final WordcountRecordHandler handler;
private final PartitionStatisticsRepository repository;
private final String id;
+ private final String topic;
+ private final Clock clock;
+ private final Duration commitInterval;
private final Consumer<String, String> consumer;
+ private Instant lastCommit = Instant.EPOCH;
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
});
}
+
+
+ @Override
+ public void beforeNextPoll()
+ {
+ if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+ {
+ log.debug("Storing data and offsets, last commit: {}", lastCommit);
+ handler.getSeen().forEach((partiton, statistics) -> repository.save(
+ new StatisticsDocument(
+ partiton,
+ statistics,
+ consumer.position(new TopicPartition(topic, partiton)))));
+ lastCommit = clock.instant();
+ }
+ }
}