+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ Long newOffset = consumer.position(tp);
+ log.info(
+ "{} - removing partition: {}, offset of next message {})",
+ id,
+ partition,
+ newOffset);
+ Map<String, Map<String, Long>> removed = handler.removePartition(partition);
+ repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+ });
+ }
+
+
+ @Override
+ public void beforeNextPoll()
+ {
+ if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+ {
+ log.debug("Storing data and offsets, last commit: {}", lastCommit);
+ handler.getSeen().forEach((partiton, statistics) -> repository.save(
+ new StatisticsDocument(
+ partiton,
+ statistics,
+ consumer.position(new TopicPartition(topic, partiton)))));
+ lastCommit = clock.instant();
+ }