+ void seekToEnd()
+ {
+ offsetConsumer.assign(partitions());
+ partitions().forEach(tp ->
+ {
+ Long offset = offsetConsumer.position(tp);
+ log.info("New position for {}: {}", tp, offset);
+ Integer partition = tp.partition();
+ StatisticsDocument document =
+ partitionStatisticsRepository
+ .findById(partition.toString())
+ .orElse(new StatisticsDocument(partition));
+ document.offset = offset;
+ partitionStatisticsRepository.save(document);
+ });
+ offsetConsumer.unsubscribe();
+ }
+