From: Kai Moritz Date: Sat, 2 Nov 2024 10:33:53 +0000 (+0100) Subject: Refactor: DRY für state-change zu UNASSIGNED X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=20e81d81c1c14965a4ee949bb640154845f9e949;p=demos%2Fkafka%2Ftraining Refactor: DRY für state-change zu UNASSIGNED --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index f1675b4..3552689 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -360,15 +360,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener PartitionState partitionState = partitionStates[partition.partition()]; switch (partitionState) { - case RESTORING: - case ASSIGNED: - phaser.arriveAndDeregister(); - log.info( - "{} - Deregistered partie for revoked partition {}. New total number of parties: {}", - id, - partition, - phaser.getRegisteredParties()); - partitionStates[partition.partition()] = PartitionState.UNASSIGNED; + case RESTORING, ASSIGNED: + stateUnassigned(partition.partition()); break; default: case UNASSIGNED: @@ -397,6 +390,25 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener consumer.resume(List.of(messagePartition)); } + private void stateUnassigned(int partition) + { + log.info( + "{} - State-change for partition {}: {} -> UNASSIGNED", + id, + partition, + partitionStates[partition]); + + partitionStates[partition] = PartitionState.UNASSIGNED; + + phaser.arriveAndDeregister(); + log.info( + "{} - Deregistered partie for revoked partition {}. New total number of parties: {}", + id, + partition, + phaser.getRegisteredParties()); + } + + public void shutdown() throws InterruptedException { log.info("{} joining the worker-thread...", id);