Der Consumer zählt jetzt die Nachrichten pro Key für jedes Topic
authorKai Moritz <kai@juplo.de>
Fri, 1 Apr 2022 20:02:28 +0000 (22:02 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 1 Apr 2022 22:21:18 +0000 (00:21 +0200)
* Die Ergebnisse werden beim Beenden des Consumer ausgegeben
* Wenn der Consumer neu gestartet wird, werden die Ergebnisse zurückgesetzt
* Über /seen können Zwischenstände abgefragt werden

README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 900270a..c57a29b 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/endless-consumer:1.0-SNAPSHOT
+IMAGE=juplo/counting-consumer:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -26,6 +26,16 @@ echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
 docker-compose up -d producer consumer
-sleep 15
+sleep 10
+http :8081/seen
+sleep 1
+http :8081/seen
+sleep 1
+http :8081/seen
+sleep 1
+http :8081/seen
+sleep 1
+http :8081/seen
+sleep 1
 docker-compose stop producer consumer
 docker-compose logs consumer
index bed1fc1..8c995e7 100644 (file)
@@ -48,7 +48,7 @@ services:
 
 
   consumer:
-    image: juplo/endless-consumer:1.0-SNAPSHOT
+    image: juplo/counting-consumer:1.0-SNAPSHOT
     ports:
       - 8081:8081
     environment:
diff --git a/pom.xml b/pom.xml
index 54bb695..3467bc7 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -12,9 +12,9 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>endless-consumer</artifactId>
+  <artifactId>counting-consumer</artifactId>
   <version>1.0-SNAPSHOT</version>
-  <name>Endless Consumer: a Simple Consumer-Group that reads and print the topic</name>
+  <name>Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic</name>
 
   <dependencies>
     <dependency>
index d8068e5..a504842 100644 (file)
@@ -1,9 +1,11 @@
 package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 
@@ -25,4 +27,11 @@ public class DriverController
   {
     consumer.stop();
   }
+
+
+  @GetMapping("seen")
+  public Map<Integer, Map<String, Integer>> seen()
+  {
+    return consumer.getSeen();
+  }
 }
index b3dd446..063a09e 100644 (file)
@@ -10,6 +10,8 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import javax.annotation.PreDestroy;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -32,6 +34,9 @@ public class EndlessConsumer implements Runnable
   private KafkaConsumer<String, String> consumer = null;
   private Future<?> future = null;
 
+  private Map<Integer, Map<String, Integer>> seen;
+
+
   public EndlessConsumer(
       ExecutorService executor,
       String bootstrapServer,
@@ -66,6 +71,8 @@ public class EndlessConsumer implements Runnable
       log.info("{} - Subscribing to topic {}", id, topic);
       consumer.subscribe(Arrays.asList(topic));
 
+      seen = new HashMap<>();
+
       while (true)
       {
         ConsumerRecords<String, String> records =
@@ -85,6 +92,21 @@ public class EndlessConsumer implements Runnable
               record.key(),
               record.value()
           );
+
+          Integer partition = record.partition();
+          String key = record.key();
+
+          if (!seen.containsKey(partition))
+            seen.put(partition, new HashMap<>());
+
+          Map<String, Integer> byKey = seen.get(partition);
+
+          if (!byKey.containsKey(key))
+            byKey.put(key, 0);
+
+          int seenByKey = byKey.get(key);
+          seenByKey++;
+          byKey.put(key, seenByKey);
         }
       }
     }
@@ -101,10 +123,30 @@ public class EndlessConsumer implements Runnable
     {
       log.info("{} - Closing the KafkaConsumer", id);
       consumer.close();
+
+      for (Integer partition : seen.keySet())
+      {
+        Map<String, Integer> byKey = seen.get(partition);
+        for (String key : byKey.keySet())
+        {
+          log.info(
+              "{} - Seen {} messages for partition={}|key={}",
+              id,
+              byKey.get(key),
+              partition,
+              key);
+        }
+      }
+      seen = null;
+
       log.info("{} - Consumer-Thread exiting", id);
     }
   }
 
+  public Map<Integer, Map<String, Integer>> getSeen()
+  {
+    return seen;
+  }
 
   public synchronized void start()
   {