From f11f0dadc74a1235a56510101e6a5d32cf951d0a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 10:21:56 +0100 Subject: [PATCH] =?utf8?q?Refactor:=20Logik=20f=C3=BCr=20Counter=20in=20Kl?= =?utf8?q?asse=20`CounterState`=20extrahiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/CounterState.java | 26 +++++++++++++++++++ .../java/de/juplo/kafka/ExampleConsumer.java | 10 +++---- 2 files changed, 31 insertions(+), 5 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/CounterState.java diff --git a/src/main/java/de/juplo/kafka/CounterState.java b/src/main/java/de/juplo/kafka/CounterState.java new file mode 100644 index 0000000..5e39371 --- /dev/null +++ b/src/main/java/de/juplo/kafka/CounterState.java @@ -0,0 +1,26 @@ +package de.juplo.kafka; + +import java.util.HashMap; +import java.util.Map; + + +public class CounterState +{ + private final Map counterState = new HashMap<>(); + + + public void setCounterState(String key, long counter) + { + counterState.put(key, counter); + } + + public synchronized Long addToCounter(String key) + { + return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1); + } + + public synchronized Map getCounterState() + { + return counterState; + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index b90cb77..cb80710 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -23,7 +23,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private final Consumer consumer; private final Thread workerThread; - private final Map counterState = new HashMap<>(); + private final CounterState counterState = new CounterState(); private final String stateTopic; Producer producer; @@ -166,7 +166,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener String key, String value) { - counterState.put(key, Long.parseLong(value)); + counterState.setCounterState(key, Long.parseLong(value)); if (offset + 1 == stateEndOffsets[partition]) { log.info( @@ -206,12 +206,12 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private synchronized Long computeCount(String key) { - return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1); + return counterState.addToCounter(key); } - public synchronized Map getCounterState() + public Map getCounterState() { - return counterState; + return counterState.getCounterState(); } void sendCounterState(int partition, String key, Long counter) -- 2.20.1