From: Kai Moritz Date: Sun, 27 Oct 2024 11:03:06 +0000 (+0100) Subject: Der Consumer zählt, wie oft die Schlüssel auftreten X-Git-Tag: consumer/spring-consumer--log-compaction--2025-02-signal~26 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=dfd16558b4f8e60d9ef8a5950a0f3007e93e594f;p=demos%2Fkafka%2Ftraining Der Consumer zählt, wie oft die Schlüssel auftreten --- diff --git a/README.sh b/README.sh index b46e2350..cf72e513 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-log-compaction-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4fa2eade..a36006fd 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -145,7 +145,7 @@ services: juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer diff --git a/pom.xml b/pom.xml index dd96d00f..4d29574e 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + 1.1-log-compaction-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 79382ef9..64a3396d 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -8,6 +8,8 @@ import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; @Slf4j @@ -19,6 +21,8 @@ public class ExampleConsumer implements Runnable private final Thread workerThread; private final Runnable closeCallback; + private final Map counterState = new HashMap<>(); + private volatile boolean running = false; private long consumed = 0; @@ -94,6 +98,9 @@ public class ExampleConsumer implements Runnable { consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); + Integer counted = Integer.parseInt(key); + Long counter = counterState.compute(counted, (k, v) -> v == null ? 1l : v + 1); + log.info("{} - current value for counter {}: {}", id, counted, counter); }