+ partitions.forEach(tp ->
+ {
+ if (!tp.topic().equals(topic))
+ {
+ log.warn("Ignoring partition from unwanted topic: {}", tp);
+ return;
+ }
+
+ int partition = tp.partition();
+ long unseenOffset = offsets[partition];
+
+ log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+ });