From b118d961594083c7404bbd8c6ef70bf642158b70 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 27 Oct 2024 12:03:06 +0100 Subject: [PATCH] =?utf8?q?Der=20Consumer=20z=C3=A4hlt,=20wie=20oft=20die?= =?utf8?q?=20Schl=C3=BCssel=20auftreten?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- src/main/java/de/juplo/kafka/ExampleConsumer.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 59da60b..85b77b9 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -8,6 +8,8 @@ import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; @Slf4j @@ -19,6 +21,8 @@ public class ExampleConsumer implements Runnable private final Thread workerThread; private final Runnable closeCallback; + private final Map counterState = new HashMap<>(); + private volatile boolean running = false; private long consumed = 0; @@ -94,6 +98,9 @@ public class ExampleConsumer implements Runnable { consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); + Integer counted = Integer.parseInt(key); + Long counter = counterState.compute(counted, (k, v) -> v == null ? 1l : v + 1); + log.info("{} - current value for counter {}: {}", id, counted, counter); } -- 2.20.1