public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
phaser.bulkRegister(partitions.size());
+ log.info(
+ "{} - Added {} parties for newly assigned partitions. New total number of parties: {}",
+ id,
+ partitions.size(),
+ phaser.getRegisteredParties());
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
partitions.forEach(partition -> phaser.arriveAndDeregister());
+ log.info(
+ "{} - Removed {} parties for revoked partitions. New total number of parties: {}",
+ id,
+ partitions.size(),
+ phaser.getRegisteredParties());
}
public void shutdown() throws InterruptedException