DRY für state-change zu RESTORING
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 12:37:38 +0000 (13:37 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:27:08 +0000 (14:27 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index d852f41..84e5057 100644 (file)
@@ -297,20 +297,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       {
         log.info("{} - Adding partition {}", id, partition);
         assignedPartitions.add(partition);
-
-        phaser.register();
-        log.info(
-          "{} - Registered new party for restored assigned partition {}. New total number of parties: {}",
-          id,
-          partition,
-          phaser.getRegisteredParties());
-
-        log.info(
-          "{} - Changing partition-state for {}: {} -> RESTORING",
-          id,
-          partition,
-          partitionStates[partition.partition()]);
-        partitionStates[partition.partition()] = PartitionState.RESTORING;
+        stateRestoring(partition.partition());
       }
       else
       {
@@ -375,6 +362,23 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     });
   }
 
+  private void stateRestoring(int partition)
+  {
+    log.info(
+      "{} - Changing partition-state for {}: {} -> RESTORING",
+      id,
+      partition,
+      partitionStates[partition]);
+    partitionStates[partition] = PartitionState.RESTORING;
+
+    phaser.register();
+    log.info(
+      "{} - Registered new party for newly assigned partition {}. New total number of parties: {}",
+      id,
+      partition,
+      phaser.getRegisteredParties());
+  }
+
   private void stateAssigned(int partition)
   {
     log.info(