+ {
+ for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(partitions).entrySet())
+ {
+ TopicPartition topicPartition = entry.getKey();
+ Integer partition = topicPartition.partition();
+ long offset = repository.activatePartition(partition);
+ log.info("activated partition {}, seeking to offset {}", partition, offset);
+ consumer.seek(topicPartition, offset);
+ Long endOffset = entry.getValue();
+ if (offset < endOffset)
+ {
+ log.info("--> starting restore of partition {}: {} -> {}", partition, offset, endOffset);
+ recordHandlers[partition] = new RestoreRecordHandler(endOffset);
+ }
+ else
+ {
+ log.info("--> partition {} is up-to-date, offset: {}", partition, offset);
+ recordHandlers[partition] = productionRecordHandler;
+ }
+ }
+ }