From: Kai Moritz Date: Sun, 27 Oct 2024 11:03:06 +0000 (+0100) Subject: Der Consumer zählt, wie oft die Schlüssel auftreten X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d9086d16c4bef6d3ad6dc639aaba2419ab14bb8e;p=demos%2Fkafka%2Ftraining Der Consumer zählt, wie oft die Schlüssel auftreten --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 772643f..0f4ab04 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -8,6 +8,8 @@ import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; @Slf4j @@ -18,6 +20,8 @@ public class ExampleConsumer implements Runnable private final Consumer consumer; private final Thread workerThread; + private final Map counterState = new HashMap<>(); + private volatile boolean running = false; private long consumed = 0; @@ -86,6 +90,9 @@ public class ExampleConsumer implements Runnable { consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); + Integer counted = Integer.parseInt(key); + Long counter = counterState.compute(counted, (k, v) -> v == null ? 1l : v + 1); + log.info("{} - current value for counter {}: {}", id, counted, counter); }