Der Consumer zählt, wie oft die Schlüssel auftreten
authorKai Moritz <kai@juplo.de>
Sun, 27 Oct 2024 11:03:06 +0000 (12:03 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:26:07 +0000 (14:26 +0100)
README.sh
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ExampleConsumer.java

index b46e235..cf72e51 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-log-compaction-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
index 6bd2766..8a9173a 100644 (file)
@@ -199,7 +199,7 @@ services:
       juplo.producer.throttle-ms: 100
 
   consumer:
-    image: juplo/spring-consumer:1.1-SNAPSHOT
+    image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: consumer
diff --git a/pom.xml b/pom.xml
index 98a0a36..9136018 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
   <artifactId>spring-consumer</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-SNAPSHOT</version>
+  <version>1.1-log-compaction-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
index f832b45..f6fb7a5 100644 (file)
@@ -8,6 +8,8 @@ import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 
 @Slf4j
@@ -19,6 +21,8 @@ public class ExampleConsumer implements Runnable
   private final Thread workerThread;
   private final Runnable closeCallback;
 
+  private final Map<Integer, Long> counterState = new HashMap<>();
+
   private volatile boolean running = false;
   private long consumed = 0;
 
@@ -94,6 +98,9 @@ public class ExampleConsumer implements Runnable
   {
     consumed++;
     log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
+    Integer counted = Integer.parseInt(key);
+    Long counter = counterState.compute(counted, (k, v) -> v == null ? 1l : v + 1);
+    log.info("{} - current value for counter {}: {}", id, counted, counter);
   }