Der Consumer zählt, wie oft die Schlüssel auftreten
authorKai Moritz <kai@juplo.de>
Sun, 27 Oct 2024 11:03:06 +0000 (12:03 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 13:49:58 +0000 (14:49 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 772643f..0f4ab04 100644 (file)
@@ -8,6 +8,8 @@ import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 
 @Slf4j
@@ -18,6 +20,8 @@ public class ExampleConsumer implements Runnable
   private final Consumer<String, String> consumer;
   private final Thread workerThread;
 
+  private final Map<Integer, Long> counterState = new HashMap<>();
+
   private volatile boolean running = false;
   private long consumed = 0;
 
@@ -86,6 +90,9 @@ public class ExampleConsumer implements Runnable
   {
     consumed++;
     log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
+    Integer counted = Integer.parseInt(key);
+    Long counter = counterState.compute(counted, (k, v) -> v == null ? 1l : v + 1);
+    log.info("{} - current value for counter {}: {}", id, counted, counter);
   }