From: Kai Moritz Date: Sat, 2 Nov 2024 09:21:56 +0000 (+0100) Subject: Refactor: Logik für Counter in Klasse `CounterState` extrahiert X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f11f0dadc74a1235a56510101e6a5d32cf951d0a;p=demos%2Fkafka%2Ftraining Refactor: Logik für Counter in Klasse `CounterState` extrahiert --- diff --git a/src/main/java/de/juplo/kafka/CounterState.java b/src/main/java/de/juplo/kafka/CounterState.java new file mode 100644 index 0000000..5e39371 --- /dev/null +++ b/src/main/java/de/juplo/kafka/CounterState.java @@ -0,0 +1,26 @@ +package de.juplo.kafka; + +import java.util.HashMap; +import java.util.Map; + + +public class CounterState +{ + private final Map counterState = new HashMap<>(); + + + public void setCounterState(String key, long counter) + { + counterState.put(key, counter); + } + + public synchronized Long addToCounter(String key) + { + return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1); + } + + public synchronized Map getCounterState() + { + return counterState; + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index b90cb77..cb80710 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -23,7 +23,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private final Consumer consumer; private final Thread workerThread; - private final Map counterState = new HashMap<>(); + private final CounterState counterState = new CounterState(); private final String stateTopic; Producer producer; @@ -166,7 +166,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener String key, String value) { - counterState.put(key, Long.parseLong(value)); + counterState.setCounterState(key, Long.parseLong(value)); if (offset + 1 == stateEndOffsets[partition]) { log.info( @@ -206,12 +206,12 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private synchronized Long computeCount(String key) { - return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1); + return counterState.addToCounter(key); } - public synchronized Map getCounterState() + public Map getCounterState() { - return counterState; + return counterState.getCounterState(); } void sendCounterState(int partition, String key, Long counter)