PartitionState partitionState = partitionStates[partition.partition()];
switch (partitionState)
{
- case RESTORING:
- case ASSIGNED:
- phaser.arriveAndDeregister();
- log.info(
- "{} - Deregistered party for revoked partition {}. New total number of parties: {}",
- id,
- partition,
- phaser.getRegisteredParties());
- partitionStates[partition.partition()] = PartitionState.UNASSIGNED;
+ case RESTORING, ASSIGNED:
+ stateUnassigned(partition.partition());
break;
default:
case UNASSIGNED:
consumer.resume(List.of(messagePartition));
}
+ private void stateUnassigned(int partition)
+ {
+ log.info(
+ "{} - State-change for partition {}: {} -> UNASSIGNED",
+ id,
+ partition,
+ partitionStates[partition]);
+
+ partitionStates[partition] = PartitionState.UNASSIGNED;
+
+ phaser.arriveAndDeregister();
+ log.info(
+ "{} - Deregistered party for revoked partition {}. New total number of parties: {}",
+ id,
+ partition,
+ phaser.getRegisteredParties());
+ }
+
+
public void shutdown() throws InterruptedException
{
log.info("{} joining the worker-thread...", id);