From 56f2f551ac03b50814675c07004df6036378ce9d Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sat, 2 Nov 2024 17:05:23 +0100
Subject: [PATCH] =?utf8?q?Refactor:=20Zustand=20muss=20`CounterState`=20vo?=
 =?utf8?q?llst=C3=A4ndig=20=C3=BCbergeben=20werden?=
MIME-Version: 1.0
Content-Type: text/plain; charset=utf8
Content-Transfer-Encoding: 8bit

---
 src/main/java/de/juplo/kafka/CounterState.java    | 10 ++++------
 src/main/java/de/juplo/kafka/ExampleConsumer.java | 12 ++++++++----
 2 files changed, 12 insertions(+), 10 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/CounterState.java b/src/main/java/de/juplo/kafka/CounterState.java
index 5e393712..715a97a1 100644
--- a/src/main/java/de/juplo/kafka/CounterState.java
+++ b/src/main/java/de/juplo/kafka/CounterState.java
@@ -1,18 +1,16 @@
 package de.juplo.kafka;
 
+import lombok.RequiredArgsConstructor;
+
 import java.util.HashMap;
 import java.util.Map;
 
 
+@RequiredArgsConstructor
 public class CounterState
 {
-  private final Map<String, Long> counterState = new HashMap<>();
-
+  private final Map<String, Long> counterState;
 
-  public void setCounterState(String key, long counter)
-  {
-    counterState.put(key, counter);
-  }
 
   public synchronized Long addToCounter(String key)
   {
diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java
index 7f1a7bce..59b0d127 100644
--- a/src/main/java/de/juplo/kafka/ExampleConsumer.java
+++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java
@@ -31,6 +31,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   private final Phaser phaser = new Phaser(1);
   private final Set<TopicPartition> assignedPartitions = new HashSet<>();
   private volatile PartitionState[] partitionStates;
+  private Map<String, Long>[] restoredState;
   private CounterState[] counterState;
   private volatile long[] stateEndOffsets;
   private volatile int[] seen;
@@ -73,6 +74,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       {
         partitionStates[i] = PartitionState.UNASSIGNED;
       }
+      restoredState = new Map[numPartitions];
       counterState = new CounterState[numPartitions];
       stateEndOffsets = new long[numPartitions];
       seen = new int[numPartitions];
@@ -175,7 +177,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     String key,
     String value)
   {
-    counterState[partition].setCounterState(key, Long.parseLong(value));
+    restoredState[partition].put(key, Long.parseLong(value));
     if (offset + 1 == stateEndOffsets[partition])
     {
       log.info("{} - Restoring of state for partition {} done!", id, partition);
@@ -311,8 +313,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
   private void restoreAndAssign(int partition)
   {
-    counterState[partition] = new CounterState();
-
     TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
 
     long stateEndOffset = consumer
@@ -337,7 +337,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     }
     else
     {
-      log.info("{} - State is up-to-date for partition {}", id, partition);
+      log.info("{} - No state available for partition {}", id, partition);
+      restoredState[partition] = new HashMap<>();
       stateAssigned(partition);
     }
   }
@@ -373,6 +374,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       statePartition);
     consumer.seek(statePartition, stateBeginningOffset);
     stateEndOffsets[partition] = stateEndOffset;
+    restoredState[partition] = new HashMap<>();
     log.info("{} - Resuming state partition {}", id, statePartition);
     consumer.resume(List.of(statePartition));
   }
@@ -390,6 +392,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     TopicPartition statePartition = new TopicPartition(stateTopic, partition);
     log.info("{} - Pausing state partition {}...", id, statePartition);
     consumer.pause(List.of(statePartition));
+    counterState[partition] = new CounterState(restoredState[partition]);
+    restoredState[partition] = null;
 
     TopicPartition messagePartition = new TopicPartition(topic, partition);
     log.info("{} - Adding partition {} to the assigned partitions", id, messagePartition);
-- 
2.20.1