From a31290ca5b93f00955726082b213bfe175392112 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 13:37:38 +0100 Subject: [PATCH] =?utf8?q?DRY=20f=C3=BCr=20state-change=20zu=20RESTORING?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ExampleConsumer.java | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 4179331..3f00a08 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -289,20 +289,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { log.info("{} - Adding partition {}", id, partition); assignedPartitions.add(partition); - - phaser.register(); - log.info( - "{} - Registered new partie for restored assigned partition {}. New total number of parties: {}", - id, - partition, - phaser.getRegisteredParties()); - - log.info( - "{} - Changing partition-state for {}: {} -> RESTORING", - id, - partition, - partitionStates[partition.partition()]); - partitionStates[partition.partition()] = PartitionState.RESTORING; + stateRestoring(partition.partition()); } else { @@ -367,6 +354,23 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener }); } + private void stateRestoring(int partition) + { + log.info( + "{} - Changing partition-state for {}: {} -> RESTORING", + id, + partition, + partitionStates[partition]); + partitionStates[partition] = PartitionState.RESTORING; + + phaser.register(); + log.info( + "{} - Registered new partie for newly assigned partition {}. New total number of parties: {}", + id, + partition, + phaser.getRegisteredParties()); + } + private void stateAssigned(int partition) { log.info( -- 2.20.1