Refactor: Zustand muss `CounterState` vollständig übergeben werden
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 16:05:23 +0000 (17:05 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:27:08 +0000 (14:27 +0100)
src/main/java/de/juplo/kafka/CounterState.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 5e39371..715a97a 100644 (file)
@@ -1,18 +1,16 @@
 package de.juplo.kafka;
 
+import lombok.RequiredArgsConstructor;
+
 import java.util.HashMap;
 import java.util.Map;
 
 
+@RequiredArgsConstructor
 public class CounterState
 {
-  private final Map<String, Long> counterState = new HashMap<>();
-
+  private final Map<String, Long> counterState;
 
-  public void setCounterState(String key, long counter)
-  {
-    counterState.put(key, counter);
-  }
 
   public synchronized Long addToCounter(String key)
   {
index 4e23462..490fbdc 100644 (file)
@@ -31,6 +31,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   private final Phaser phaser = new Phaser(1);
   private final Set<TopicPartition> assignedPartitions = new HashSet<>();
   private volatile PartitionState[] partitionStates;
+  private Map<String, Long>[] restoredState;
   private CounterState[] counterState;
   private volatile long[] stateEndOffsets;
   private volatile int[] seen;
@@ -73,6 +74,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       {
         partitionStates[i] = PartitionState.UNASSIGNED;
       }
+      restoredState = new Map[numPartitions];
       counterState = new CounterState[numPartitions];
       stateEndOffsets = new long[numPartitions];
       seen = new int[numPartitions];
@@ -175,7 +177,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     String key,
     String value)
   {
-    counterState[partition].setCounterState(key, Long.parseLong(value));
+    restoredState[partition].put(key, Long.parseLong(value));
     if (offset + 1 == stateEndOffsets[partition])
     {
       log.info("{} - Restoring of state for partition {} done!", id, partition);
@@ -311,8 +313,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
   private void restoreAndAssign(int partition)
   {
-    counterState[partition] = new CounterState();
-
     TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
 
     long stateEndOffset = consumer
@@ -337,7 +337,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     }
     else
     {
-      log.info("{} - State is up-to-date for partition {}", id, partition);
+      log.info("{} - No state available for partition {}", id, partition);
+      restoredState[partition] = new HashMap<>();
       stateAssigned(partition);
     }
   }
@@ -373,6 +374,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       statePartition);
     consumer.seek(statePartition, stateBeginningOffset);
     stateEndOffsets[partition] = stateEndOffset;
+    restoredState[partition] = new HashMap<>();
     log.info("{} - Resuming state partition {}", id, statePartition);
     consumer.resume(List.of(statePartition));
   }
@@ -390,6 +392,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     TopicPartition statePartition = new TopicPartition(stateTopic, partition);
     log.info("{} - Pausing state partition {}...", id, statePartition);
     consumer.pause(List.of(statePartition));
+    counterState[partition] = new CounterState(restoredState[partition]);
+    restoredState[partition] = null;
 
     TopicPartition messagePartition = new TopicPartition(topic, partition);
     log.info("{} - Adding partition {} to the assigned partitions", id, messagePartition);