Refactor: Logik für Counter in Klasse `CounterState` extrahiert
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 09:21:56 +0000 (10:21 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 15:06:58 +0000 (16:06 +0100)
src/main/java/de/juplo/kafka/CounterState.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ExampleConsumer.java

diff --git a/src/main/java/de/juplo/kafka/CounterState.java b/src/main/java/de/juplo/kafka/CounterState.java
new file mode 100644 (file)
index 0000000..5e39371
--- /dev/null
@@ -0,0 +1,26 @@
+package de.juplo.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class CounterState
+{
+  private final Map<String, Long> counterState = new HashMap<>();
+
+
+  public void setCounterState(String key, long counter)
+  {
+    counterState.put(key, counter);
+  }
+
+  public synchronized Long addToCounter(String key)
+  {
+    return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+  }
+
+  public synchronized Map<String, Long> getCounterState()
+  {
+    return counterState;
+  }
+}
index b90cb77..cb80710 100644 (file)
@@ -23,7 +23,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   private final Consumer<String, String> consumer;
   private final Thread workerThread;
 
-  private final Map<String, Long> counterState = new HashMap<>();
+  private final CounterState counterState = new CounterState();
   private final String stateTopic;
   Producer<String, String> producer;
 
@@ -166,7 +166,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     String key,
     String value)
   {
-    counterState.put(key, Long.parseLong(value));
+    counterState.setCounterState(key, Long.parseLong(value));
     if (offset + 1 == stateEndOffsets[partition])
     {
       log.info(
@@ -206,12 +206,12 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
   private synchronized Long computeCount(String key)
   {
-    return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+    return counterState.addToCounter(key);
   }
 
-  public synchronized Map<String, Long> getCounterState()
+  public Map<String, Long> getCounterState()
   {
-    return counterState;
+    return counterState.getCounterState();
   }
 
   void sendCounterState(int partition, String key, Long counter)