+ consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener()
+ {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ log.info("{} - removing partition: {}", id, tp);
+ PartitionStatistics removed = seen.remove(tp);
+ for (KeyCounter counter : removed.getStatistics())
+ {
+ log.info(
+ "{} - Seen {} messages for partition={}|key={}",
+ id,
+ counter.getResult(),
+ removed.getPartition(),
+ counter.getKey());
+ }
+ repository.save(new StatisticsDocument(removed, consumer.position(tp)));
+ });
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ log.info("{} - adding partition: {}", id, tp);
+ StatisticsDocument document =
+ repository
+ .findById(tp.toString())
+ .orElse(new StatisticsDocument(tp));
+ consumer.seek(tp, document.offset);
+ seen.put(tp, new PartitionStatistics(document));
+ });
+ }
+ });