From: Kai Moritz Date: Sat, 2 Nov 2024 12:37:38 +0000 (+0100) Subject: DRY für state-change zu RESTORING X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a31290ca5b93f00955726082b213bfe175392112;p=demos%2Fkafka%2Ftraining DRY für state-change zu RESTORING --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 4179331..3f00a08 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -289,20 +289,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { log.info("{} - Adding partition {}", id, partition); assignedPartitions.add(partition); - - phaser.register(); - log.info( - "{} - Registered new partie for restored assigned partition {}. New total number of parties: {}", - id, - partition, - phaser.getRegisteredParties()); - - log.info( - "{} - Changing partition-state for {}: {} -> RESTORING", - id, - partition, - partitionStates[partition.partition()]); - partitionStates[partition.partition()] = PartitionState.RESTORING; + stateRestoring(partition.partition()); } else { @@ -367,6 +354,23 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener }); } + private void stateRestoring(int partition) + { + log.info( + "{} - Changing partition-state for {}: {} -> RESTORING", + id, + partition, + partitionStates[partition]); + partitionStates[partition] = PartitionState.RESTORING; + + phaser.register(); + log.info( + "{} - Registered new partie for newly assigned partition {}. New total number of parties: {}", + id, + partition, + phaser.getRegisteredParties()); + } + private void stateAssigned(int partition) { log.info(