From: Kai Moritz Date: Sat, 2 Nov 2024 09:21:56 +0000 (+0100) Subject: Refactor: Logik für Counter in Klasse `CounterState` extrahiert X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=16bcf21e5569320610213ed3af9cbc382c00f924;p=demos%2Fkafka%2Ftraining Refactor: Logik für Counter in Klasse `CounterState` extrahiert --- diff --git a/src/main/java/de/juplo/kafka/CounterState.java b/src/main/java/de/juplo/kafka/CounterState.java new file mode 100644 index 0000000..5e39371 --- /dev/null +++ b/src/main/java/de/juplo/kafka/CounterState.java @@ -0,0 +1,26 @@ +package de.juplo.kafka; + +import java.util.HashMap; +import java.util.Map; + + +public class CounterState +{ + private final Map counterState = new HashMap<>(); + + + public void setCounterState(String key, long counter) + { + counterState.put(key, counter); + } + + public synchronized Long addToCounter(String key) + { + return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1); + } + + public synchronized Map getCounterState() + { + return counterState; + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index edd55f8..b7d7e81 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -24,7 +24,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private final Thread workerThread; private final Runnable closeCallback; - private final Map counterState = new HashMap<>(); + private final CounterState counterState = new CounterState(); private final String stateTopic; private final Producer producer; @@ -174,7 +174,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener String key, String value) { - counterState.put(key, Long.parseLong(value)); + counterState.setCounterState(key, Long.parseLong(value)); if (offset + 1 == stateEndOffsets[partition]) { log.info( @@ -214,12 +214,12 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private synchronized Long computeCount(String key) { - return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1); + return counterState.addToCounter(key); } - public synchronized Map getCounterState() + public Map getCounterState() { - return counterState; + return counterState.getCounterState(); } void sendCounterState(int partition, String key, Long counter)