DRY für state-change zu RESTORING
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 12:37:38 +0000 (13:37 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 15:09:08 +0000 (16:09 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 4179331..3f00a08 100644 (file)
@@ -289,20 +289,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       {
         log.info("{} - Adding partition {}", id, partition);
         assignedPartitions.add(partition);
-
-        phaser.register();
-        log.info(
-          "{} - Registered new partie for restored assigned partition {}. New total number of parties: {}",
-          id,
-          partition,
-          phaser.getRegisteredParties());
-
-        log.info(
-          "{} - Changing partition-state for {}: {} -> RESTORING",
-          id,
-          partition,
-          partitionStates[partition.partition()]);
-        partitionStates[partition.partition()] = PartitionState.RESTORING;
+        stateRestoring(partition.partition());
       }
       else
       {
@@ -367,6 +354,23 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     });
   }
 
+  private void stateRestoring(int partition)
+  {
+    log.info(
+      "{} - Changing partition-state for {}: {} -> RESTORING",
+      id,
+      partition,
+      partitionStates[partition]);
+    partitionStates[partition] = PartitionState.RESTORING;
+
+    phaser.register();
+    log.info(
+      "{} - Registered new partie for newly assigned partition {}. New total number of parties: {}",
+      id,
+      partition,
+      phaser.getRegisteredParties());
+  }
+
   private void stateAssigned(int partition)
   {
     log.info(