From 808bd074aaae940e5f81bc9f09c42d48d1fd2670 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Fri, 1 Apr 2022 22:02:28 +0200
Subject: [PATCH] =?utf8?q?Der=20Consumer=20z=C3=A4hlt=20jetzt=20die=20Nach?=
 =?utf8?q?richten=20pro=20Key=20f=C3=BCr=20jedes=20Topic?=
MIME-Version: 1.0
Content-Type: text/plain; charset=utf8
Content-Transfer-Encoding: 8bit

* Die Ergebnisse werden beim Beenden des Consumer ausgegeben
* Wenn der Consumer neu gestartet wird, werden die Ergebnisse zurückgesetzt
* Über /seen können Zwischenstände abgefragt werden
---
 README.sh                                     | 14 ++++++-
 docker-compose.yml                            |  2 +-
 pom.xml                                       |  4 +-
 .../java/de/juplo/kafka/DriverController.java |  9 ++++
 .../java/de/juplo/kafka/EndlessConsumer.java  | 42 +++++++++++++++++++
 5 files changed, 66 insertions(+), 5 deletions(-)

diff --git a/README.sh b/README.sh
index 900270a..c57a29b 100755
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/endless-consumer:1.0-SNAPSHOT
+IMAGE=juplo/counting-consumer:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -26,6 +26,16 @@ echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
 docker-compose up -d producer consumer
-sleep 15
+sleep 10
+http :8081/seen
+sleep 1
+http :8081/seen
+sleep 1
+http :8081/seen
+sleep 1
+http :8081/seen
+sleep 1
+http :8081/seen
+sleep 1
 docker-compose stop producer consumer
 docker-compose logs consumer
diff --git a/docker-compose.yml b/docker-compose.yml
index bed1fc1..8c995e7 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -48,7 +48,7 @@ services:
 
 
   consumer:
-    image: juplo/endless-consumer:1.0-SNAPSHOT
+    image: juplo/counting-consumer:1.0-SNAPSHOT
     ports:
       - 8081:8081
     environment:
diff --git a/pom.xml b/pom.xml
index 54bb695..3467bc7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,9 +12,9 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>endless-consumer</artifactId>
+  <artifactId>counting-consumer</artifactId>
   <version>1.0-SNAPSHOT</version>
-  <name>Endless Consumer: a Simple Consumer-Group that reads and print the topic</name>
+  <name>Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic</name>
 
   <dependencies>
     <dependency>
diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java
index d8068e5..a504842 100644
--- a/src/main/java/de/juplo/kafka/DriverController.java
+++ b/src/main/java/de/juplo/kafka/DriverController.java
@@ -1,9 +1,11 @@
 package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 
@@ -25,4 +27,11 @@ public class DriverController
   {
     consumer.stop();
   }
+
+
+  @GetMapping("seen")
+  public Map<Integer, Map<String, Integer>> seen()
+  {
+    return consumer.getSeen();
+  }
 }
diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java
index b3dd446..063a09e 100644
--- a/src/main/java/de/juplo/kafka/EndlessConsumer.java
+++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java
@@ -10,6 +10,8 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import javax.annotation.PreDestroy;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -32,6 +34,9 @@ public class EndlessConsumer implements Runnable
   private KafkaConsumer<String, String> consumer = null;
   private Future<?> future = null;
 
+  private Map<Integer, Map<String, Integer>> seen;
+
+
   public EndlessConsumer(
       ExecutorService executor,
       String bootstrapServer,
@@ -66,6 +71,8 @@ public class EndlessConsumer implements Runnable
       log.info("{} - Subscribing to topic {}", id, topic);
       consumer.subscribe(Arrays.asList(topic));
 
+      seen = new HashMap<>();
+
       while (true)
       {
         ConsumerRecords<String, String> records =
@@ -85,6 +92,21 @@ public class EndlessConsumer implements Runnable
               record.key(),
               record.value()
           );
+
+          Integer partition = record.partition();
+          String key = record.key();
+
+          if (!seen.containsKey(partition))
+            seen.put(partition, new HashMap<>());
+
+          Map<String, Integer> byKey = seen.get(partition);
+
+          if (!byKey.containsKey(key))
+            byKey.put(key, 0);
+
+          int seenByKey = byKey.get(key);
+          seenByKey++;
+          byKey.put(key, seenByKey);
         }
       }
     }
@@ -101,10 +123,30 @@ public class EndlessConsumer implements Runnable
     {
       log.info("{} - Closing the KafkaConsumer", id);
       consumer.close();
+
+      for (Integer partition : seen.keySet())
+      {
+        Map<String, Integer> byKey = seen.get(partition);
+        for (String key : byKey.keySet())
+        {
+          log.info(
+              "{} - Seen {} messages for partition={}|key={}",
+              id,
+              byKey.get(key),
+              partition,
+              key);
+        }
+      }
+      seen = null;
+
       log.info("{} - Consumer-Thread exiting", id);
     }
   }
 
+  public Map<Integer, Map<String, Integer>> getSeen()
+  {
+    return seen;
+  }
 
   public synchronized void start()
   {
-- 
2.20.1