From 62b18d687bf06b45f32cf02c8fca724624389832 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 10:21:56 +0100 Subject: [PATCH] =?utf8?q?Refactor:=20Logik=20f=C3=BCr=20Counter=20in=20Kl?= =?utf8?q?asse=20`CounterState`=20extrahiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/CounterState.java | 26 +++++++++++++++++++ .../java/de/juplo/kafka/ExampleConsumer.java | 10 +++---- 2 files changed, 31 insertions(+), 5 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/CounterState.java diff --git a/src/main/java/de/juplo/kafka/CounterState.java b/src/main/java/de/juplo/kafka/CounterState.java new file mode 100644 index 0000000..5e39371 --- /dev/null +++ b/src/main/java/de/juplo/kafka/CounterState.java @@ -0,0 +1,26 @@ +package de.juplo.kafka; + +import java.util.HashMap; +import java.util.Map; + + +public class CounterState +{ + private final Map counterState = new HashMap<>(); + + + public void setCounterState(String key, long counter) + { + counterState.put(key, counter); + } + + public synchronized Long addToCounter(String key) + { + return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1); + } + + public synchronized Map getCounterState() + { + return counterState; + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 90402f8..0b57140 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -24,7 +24,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private final Thread workerThread; private final Runnable closeCallback; - private final Map counterState = new HashMap<>(); + private final CounterState counterState = new CounterState(); private final String stateTopic; private final Producer producer; @@ -174,7 +174,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener String key, String value) { - counterState.put(key, Long.parseLong(value)); + counterState.setCounterState(key, Long.parseLong(value)); if (offset + 1 == stateEndOffsets[partition]) { log.info( @@ -214,12 +214,12 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private synchronized Long computeCount(String key) { - return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1); + return counterState.addToCounter(key); } - public synchronized Map getCounterState() + public Map getCounterState() { - return counterState; + return counterState.getCounterState(); } void sendCounterState(int partition, String key, Long counter) -- 2.20.1