ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
- log.info("{} - Received {} messages", id, records.count());
+ int phase = phaser.getPhase();
+
+ log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
records
.partitions()
.forEach(partition ->
done[partition.partition()] = true;
});
- int phase = phaser.arriveAndAwaitAdvance();
- log.info("{} - Phase {} is done!", id, phase);
+ int arrivedPhase = phaser.arriveAndAwaitAdvance();
+ log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase);
}
}
catch(WakeupException e)