Wordcount-Implementierung mit Kafka-Boardmitteln und MongoDB als Storage
authorKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 19:34:43 +0000 (21:34 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 14:45:21 +0000 (16:45 +0200)
* Zählt die Wörter pro Benutzer.
* Simple Implementierung mit Maps.
* Verwendet die bereits für das Speichern der Nachrichten-Zählung und
  der Offsets verwendete MonogoDB-Anbindung zum speichern.
* Typisierung zurückgenommn: Immer String für Key/Value
* Verwendet aus Bequemlichkeit den Seen-Endpoint von der Zählung.

docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/StatisticsDocument.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 0e420b6..df41cb5 100644 (file)
@@ -56,7 +56,7 @@ services:
     command: sleep infinity
 
   producer:
-    image: juplo/endless-producer:1.0-SNAPSHOT
+    image: juplo/rest-producer:1.0-SNAPSHOT
     ports:
       - 8080:8080
     environment:
@@ -64,11 +64,9 @@ services:
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
-      producer.throttle-ms: 500
-
 
   peter:
-    image: juplo/endless-consumer:1.0-SNAPSHOT
+    image: juplo/wordcount:1.0-SNAPSHOT
     ports:
       - 8081:8080
     environment:
@@ -80,7 +78,7 @@ services:
       spring.data.mongodb.database: juplo
 
   beate:
-    image: juplo/endless-consumer:1.0-SNAPSHOT
+    image: juplo/wordcount:1.0-SNAPSHOT
     ports:
       - 8082:8080
     environment:
diff --git a/pom.xml b/pom.xml
index 701704d..fe06959 100644 (file)
--- a/pom.xml
+++ b/pom.xml
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>endless-consumer</artifactId>
+  <artifactId>wordcount</artifactId>
   <version>1.0-SNAPSHOT</version>
-  <name>Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic</name>
+  <name>Wordcount</name>
+  <description>Splits the incomming sentences into words and counts the words per user.</description>
 
   <dependencies>
     <dependency>
index 08c3955..2cf263e 100644 (file)
@@ -19,32 +19,21 @@ import java.util.function.Consumer;
 public class ApplicationConfiguration
 {
   @Bean
-  public Consumer<ConsumerRecord<String, String>> consumer()
-  {
-    return (record) ->
-    {
-      // Handle record
-    };
-  }
-
-  @Bean
-  public EndlessConsumer<String, String> endlessConsumer(
+  public EndlessConsumer endlessConsumer(
       KafkaConsumer<String, String> kafkaConsumer,
       ExecutorService executor,
-      Consumer<ConsumerRecord<String, String>> handler,
       PartitionStatisticsRepository repository,
       ApplicationProperties properties)
   {
     return
-        new EndlessConsumer<>(
+        new EndlessConsumer(
             executor,
             repository,
             properties.getClientId(),
             properties.getTopic(),
             Clock.systemDefaultZone(),
             properties.getCommitInterval(),
-            kafkaConsumer,
-            handler);
+            kafkaConsumer);
   }
 
   @Bean
index df4e653..ab9782c 100644 (file)
@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class ApplicationHealthIndicator implements HealthIndicator
 {
-  private final EndlessConsumer<String, String> consumer;
+  private final EndlessConsumer consumer;
 
 
   @Override
index ed38080..e64d6b8 100644 (file)
@@ -2,11 +2,8 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.ExceptionHandler;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.ResponseStatus;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
 
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -33,11 +30,24 @@ public class DriverController
 
 
   @GetMapping("seen")
-  public Map<Integer, Map<String, Long>> seen()
+  public Map<Integer, Map<String, Map<String, Long>>> seen()
   {
     return consumer.getSeen();
   }
 
+  @GetMapping("seen/{user}")
+  public ResponseEntity<Map<String, Long>> seen(@PathVariable String user)
+  {
+    for (Map<String, Map<String, Long>> users : consumer.getSeen().values())
+    {
+      Map<String, Long> words = users.get(user);
+      if (words != null)
+        return ResponseEntity.ok(words);
+    }
+
+    return ResponseEntity.notFound().build();
+  }
+
 
   @ExceptionHandler
   @ResponseStatus(HttpStatus.BAD_REQUEST)
index f9a9629..01f9057 100644 (file)
@@ -17,20 +17,23 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
 
 
 @Slf4j
 @RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
+public class EndlessConsumer implements ConsumerRebalanceListener, Runnable
 {
+  final static Pattern PATTERN = Pattern.compile("\\W+");
+
+
   private final ExecutorService executor;
   private final PartitionStatisticsRepository repository;
   private final String id;
   private final String topic;
   private final Clock clock;
   private final Duration commitInterval;
-  private final Consumer<K, V> consumer;
-  private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
+  private final Consumer<String, String> consumer;
 
   private final Lock lock = new ReentrantLock();
   private final Condition condition = lock.newCondition();
@@ -38,7 +41,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   private Exception exception;
   private long consumed = 0;
 
-  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
+  private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
 
 
   @Override
@@ -53,16 +56,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
           id,
           partition,
           newOffset);
-      Map<String, Long> removed = seen.remove(partition);
-      for (String key : removed.keySet())
-      {
-        log.info(
-            "{} - Seen {} messages for partition={}|key={}",
-            id,
-            removed.get(key),
-            partition,
-            key);
-      }
+      Map<String, Map<String, Long>> removed = seen.remove(partition);
       repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
     });
   }
@@ -102,12 +96,12 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
 
       while (true)
       {
-        ConsumerRecords<K, V> records =
+        ConsumerRecords<String, String> records =
             consumer.poll(Duration.ofSeconds(1));
 
         // Do something with the data...
         log.info("{} - Received {} messages", id, records.count());
-        for (ConsumerRecord<K, V> record : records)
+        for (ConsumerRecord<String, String> record : records)
         {
           log.info(
               "{} - {}: {}/{} - {}={}",
@@ -119,20 +113,32 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
               record.value()
           );
 
-          handler.accept(record);
-
           consumed++;
 
           Integer partition = record.partition();
-          String key = record.key() == null ? "NULL" : record.key().toString();
-          Map<String, Long> byKey = seen.get(partition);
-
-          if (!byKey.containsKey(key))
-            byKey.put(key, 0l);
-
-          long seenByKey = byKey.get(key);
-          seenByKey++;
-          byKey.put(key, seenByKey);
+          String user = record.key();
+          Map<String, Map<String, Long>> users = seen.get(partition);
+
+          Map<String, Long> words = users.get(user);
+          if (words == null)
+          {
+            words = new HashMap<>();
+            users.put(user, words);
+          }
+
+          for (String word : PATTERN.split(record.value()))
+          {
+            Long num = words.get(word);
+            if (num == null)
+            {
+              num = 1l;
+            }
+            else
+            {
+              num++;
+            }
+            words.put(word, num);
+          }
         }
 
         if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
@@ -212,7 +218,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     }
   }
 
-  public Map<Integer, Map<String, Long>> getSeen()
+  public Map<Integer, Map<String, Map<String, Long>>> getSeen()
   {
     return seen;
   }
index 1244f45..137c9bb 100644 (file)
@@ -15,7 +15,7 @@ public class StatisticsDocument
   @Id
   public String id;
   public long offset = -1l;
-  public Map<String, Long> statistics;
+  public Map<String, Map<String, Long>> statistics;
 
   public StatisticsDocument()
   {
@@ -27,7 +27,7 @@ public class StatisticsDocument
     this.statistics = new HashMap<>();
   }
 
-  public StatisticsDocument(Integer partition, Map<String, Long> statistics, long offset)
+  public StatisticsDocument(Integer partition, Map<String, Map<String, Long>> statistics, long offset)
   {
     this.id = Integer.toString(partition);
     this.statistics = statistics;
index ca72e3c..aa6dd4d 100644 (file)
@@ -73,7 +73,7 @@ class ApplicationTests
        PartitionStatisticsRepository repository;
 
        Consumer<ConsumerRecord<String, String>> testHandler;
-       EndlessConsumer<String, String> endlessConsumer;
+       EndlessConsumer endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
        Set<ConsumerRecord<String, String>> receivedRecords;
@@ -228,15 +228,14 @@ class ApplicationTests
                                };
 
                endlessConsumer =
-                               new EndlessConsumer<>(
+                               new EndlessConsumer(
                                                executor,
                                                repository,
                                                properties.getClientId(),
                                                properties.getTopic(),
                                                Clock.systemDefaultZone(),
                                                properties.getCommitInterval(),
-                                               kafkaConsumer,
-                                               captureOffsetAndExecuteTestHandler);
+                                               kafkaConsumer);
 
                endlessConsumer.start();
        }