Refactor: DRY für state-change zu UNASSIGNED
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 10:33:53 +0000 (11:33 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:27:08 +0000 (14:27 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index b22f5cc..e687a36 100644 (file)
@@ -368,15 +368,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
         PartitionState partitionState = partitionStates[partition.partition()];
         switch (partitionState)
         {
-          case RESTORING:
-          case ASSIGNED:
-            phaser.arriveAndDeregister();
-            log.info(
-              "{} - Deregistered party for revoked partition {}. New total number of parties: {}",
-              id,
-              partition,
-              phaser.getRegisteredParties());
-            partitionStates[partition.partition()] = PartitionState.UNASSIGNED;
+          case RESTORING, ASSIGNED:
+            stateUnassigned(partition.partition());
             break;
           default:
           case UNASSIGNED:
@@ -405,6 +398,25 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     consumer.resume(List.of(messagePartition));
   }
 
+  private void stateUnassigned(int partition)
+  {
+    log.info(
+      "{} - State-change for partition {}: {} -> UNASSIGNED",
+      id,
+      partition,
+      partitionStates[partition]);
+
+    partitionStates[partition] = PartitionState.UNASSIGNED;
+
+    phaser.arriveAndDeregister();
+    log.info(
+      "{} - Deregistered party for revoked partition {}. New total number of parties: {}",
+      id,
+      partition,
+      phaser.getRegisteredParties());
+  }
+
+
   public void shutdown() throws InterruptedException
   {
     log.info("{} joining the worker-thread...", id);