From 3f3a00a56c7329084b47b133243b41045f56d583 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sat, 2 Nov 2024 13:37:38 +0100
Subject: [PATCH] =?utf8?q?DRY=20f=C3=BCr=20state-change=20zu=20RESTORING?=
MIME-Version: 1.0
Content-Type: text/plain; charset=utf8
Content-Transfer-Encoding: 8bit

---
 .../java/de/juplo/kafka/ExampleConsumer.java  | 32 +++++++++++--------
 1 file changed, 18 insertions(+), 14 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java
index 6284c661..6165abc4 100644
--- a/src/main/java/de/juplo/kafka/ExampleConsumer.java
+++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java
@@ -297,20 +297,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       {
         log.info("{} - Adding partition {}", id, partition);
         assignedPartitions.add(partition);
-
-        phaser.register();
-        log.info(
-          "{} - Registered new party for restored assigned partition {}. New total number of parties: {}",
-          id,
-          partition,
-          phaser.getRegisteredParties());
-
-        log.info(
-          "{} - Changing partition-state for {}: {} -> RESTORING",
-          id,
-          partition,
-          partitionStates[partition.partition()]);
-        partitionStates[partition.partition()] = PartitionState.RESTORING;
+        stateRestoring(partition.partition());
       }
       else
       {
@@ -375,6 +362,23 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     });
   }
 
+  private void stateRestoring(int partition)
+  {
+    log.info(
+      "{} - Changing partition-state for {}: {} -> RESTORING",
+      id,
+      partition,
+      partitionStates[partition]);
+    partitionStates[partition] = PartitionState.RESTORING;
+
+    phaser.register();
+    log.info(
+      "{} - Registered new party for newly assigned partition {}. New total number of parties: {}",
+      id,
+      partition,
+      phaser.getRegisteredParties());
+  }
+
   private void stateAssigned(int partition)
   {
     log.info(
-- 
2.20.1