Refactor: Logik für Counter in Klasse `CounterState` extrahiert
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 09:21:56 +0000 (10:21 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:27:08 +0000 (14:27 +0100)
src/main/java/de/juplo/kafka/CounterState.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ExampleConsumer.java

diff --git a/src/main/java/de/juplo/kafka/CounterState.java b/src/main/java/de/juplo/kafka/CounterState.java
new file mode 100644 (file)
index 0000000..5e39371
--- /dev/null
@@ -0,0 +1,26 @@
+package de.juplo.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class CounterState
+{
+  private final Map<String, Long> counterState = new HashMap<>();
+
+
+  public void setCounterState(String key, long counter)
+  {
+    counterState.put(key, counter);
+  }
+
+  public synchronized Long addToCounter(String key)
+  {
+    return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+  }
+
+  public synchronized Map<String, Long> getCounterState()
+  {
+    return counterState;
+  }
+}
index 90402f8..0b57140 100644 (file)
@@ -24,7 +24,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   private final Thread workerThread;
   private final Runnable closeCallback;
 
-  private final Map<String, Long> counterState = new HashMap<>();
+  private final CounterState counterState = new CounterState();
   private final String stateTopic;
   private final Producer<String, String> producer;
 
@@ -174,7 +174,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     String key,
     String value)
   {
-    counterState.put(key, Long.parseLong(value));
+    counterState.setCounterState(key, Long.parseLong(value));
     if (offset + 1 == stateEndOffsets[partition])
     {
       log.info(
@@ -214,12 +214,12 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
   private synchronized Long computeCount(String key)
   {
-    return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+    return counterState.addToCounter(key);
   }
 
-  public synchronized Map<String, Long> getCounterState()
+  public Map<String, Long> getCounterState()
   {
-    return counterState;
+    return counterState.getCounterState();
   }
 
   void sendCounterState(int partition, String key, Long counter)