Refactor: Enum `PartitionState` in `State` umbenannt
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 16:11:14 +0000 (17:11 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 8 Nov 2024 17:21:17 +0000 (18:21 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 3cc87bb..aa79339 100644 (file)
@@ -30,7 +30,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   private volatile boolean running = false;
   private final Phaser phaser = new Phaser(1);
   private final Set<TopicPartition> assignedPartitions = new HashSet<>();
-  private volatile PartitionState[] partitionStates;
+  private volatile State[] partitionStates;
   private Map<String, Long>[] restoredState;
   private CounterState[] counterState;
   private volatile long[] stateEndOffsets;
@@ -69,10 +69,10 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
       int numPartitions = consumer.partitionsFor(topic).size();
       log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
-      partitionStates = new PartitionState[numPartitions];
+      partitionStates = new State[numPartitions];
       for (int i=0; i<numPartitions; i++)
       {
-        partitionStates[i] = PartitionState.UNASSIGNED;
+        partitionStates[i] = State.UNASSIGNED;
       }
       restoredState = new Map[numPartitions];
       counterState = new CounterState[numPartitions];
@@ -345,7 +345,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
   private void revoke(int partition)
   {
-    PartitionState partitionState = partitionStates[partition];
+    State partitionState = partitionStates[partition];
     switch (partitionState)
     {
       case RESTORING, ASSIGNED -> stateUnassigned(partition);
@@ -360,7 +360,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       id,
       partition,
       partitionStates[partition]);
-    partitionStates[partition] = PartitionState.RESTORING;
+    partitionStates[partition] = State.RESTORING;
 
     TopicPartition messagePartition = new TopicPartition(this.topic, partition);
     log.info("{} - Pausing message partition {}", id, messagePartition);
@@ -387,7 +387,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       partition,
       partitionStates[partition]);
 
-    partitionStates[partition] = PartitionState.ASSIGNED;
+    partitionStates[partition] = State.ASSIGNED;
 
     TopicPartition statePartition = new TopicPartition(stateTopic, partition);
     log.info("{} - Pausing state partition {}...", id, statePartition);
@@ -410,7 +410,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
   private void stateUnassigned(int partition)
   {
-    PartitionState oldPartitionState = partitionStates[partition];
+    State oldPartitionState = partitionStates[partition];
 
     log.info(
       "{} - State-change for partition {}: {} -> UNASSIGNED",
@@ -418,9 +418,9 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       partition,
       oldPartitionState);
 
-    partitionStates[partition] = PartitionState.UNASSIGNED;
+    partitionStates[partition] = State.UNASSIGNED;
 
-    if (oldPartitionState == PartitionState.ASSIGNED)
+    if (oldPartitionState == State.ASSIGNED)
     {
       TopicPartition messagePartition = new TopicPartition(topic, partition);
       log.info("{} - Revoking partition {}", id, messagePartition);
@@ -445,7 +445,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     workerThread.join();
   }
 
-  enum PartitionState
+  enum State
   {
     UNASSIGNED,
     RESTORING,