}
});
+ log.debug(
+ "{} - Waiting for {} parties in phase {}...",
+ id,
+ phaser.getRegisteredParties(),
+ phaser.getPhase());
int arrivedPhase = phaser.arriveAndAwaitAdvance();
log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase);
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
+ log.info("{} - Assigning new partitions", id);
partitions
.stream()
.filter(partition -> partition.topic().equals(topic))
.forEach(partition -> restoreAndAssign(partition.partition()));
+ log.info("{} - Assignment of new partitions done!", id);
}
@Override
public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
+ log.info("{} - Revoking owned partitions", id);
commit();
partitions
.stream()
.filter(partition -> partition.topic().equals(topic))
.forEach(partition -> revoke(partition.partition()));
+ log.info("{} - Revokation of previously owned partitions done!", id);
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ log.info("{} - Removing lost partitions", id);
+ producer.abortTransaction();
+ partitions
+ .stream()
+ .filter(partition -> partition.topic().equals(topic))
+ .forEach(partition -> revoke(partition.partition()));
+ producer.beginTransaction();
+ log.info("{} - Removement of lost done!", id);
}
private void restoreAndAssign(int partition)