Wordcount-Implementierung mit Kafka-Boardmitteln und MongoDB als Storage
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index f9a9629..01f9057 100644 (file)
@@ -17,20 +17,23 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
 
 
 @Slf4j
 @RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
+public class EndlessConsumer implements ConsumerRebalanceListener, Runnable
 {
+  final static Pattern PATTERN = Pattern.compile("\\W+");
+
+
   private final ExecutorService executor;
   private final PartitionStatisticsRepository repository;
   private final String id;
   private final String topic;
   private final Clock clock;
   private final Duration commitInterval;
-  private final Consumer<K, V> consumer;
-  private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
+  private final Consumer<String, String> consumer;
 
   private final Lock lock = new ReentrantLock();
   private final Condition condition = lock.newCondition();
@@ -38,7 +41,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   private Exception exception;
   private long consumed = 0;
 
-  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
+  private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
 
 
   @Override
@@ -53,16 +56,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
           id,
           partition,
           newOffset);
-      Map<String, Long> removed = seen.remove(partition);
-      for (String key : removed.keySet())
-      {
-        log.info(
-            "{} - Seen {} messages for partition={}|key={}",
-            id,
-            removed.get(key),
-            partition,
-            key);
-      }
+      Map<String, Map<String, Long>> removed = seen.remove(partition);
       repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
     });
   }
@@ -102,12 +96,12 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
 
       while (true)
       {
-        ConsumerRecords<K, V> records =
+        ConsumerRecords<String, String> records =
             consumer.poll(Duration.ofSeconds(1));
 
         // Do something with the data...
         log.info("{} - Received {} messages", id, records.count());
-        for (ConsumerRecord<K, V> record : records)
+        for (ConsumerRecord<String, String> record : records)
         {
           log.info(
               "{} - {}: {}/{} - {}={}",
@@ -119,20 +113,32 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
               record.value()
           );
 
-          handler.accept(record);
-
           consumed++;
 
           Integer partition = record.partition();
-          String key = record.key() == null ? "NULL" : record.key().toString();
-          Map<String, Long> byKey = seen.get(partition);
-
-          if (!byKey.containsKey(key))
-            byKey.put(key, 0l);
-
-          long seenByKey = byKey.get(key);
-          seenByKey++;
-          byKey.put(key, seenByKey);
+          String user = record.key();
+          Map<String, Map<String, Long>> users = seen.get(partition);
+
+          Map<String, Long> words = users.get(user);
+          if (words == null)
+          {
+            words = new HashMap<>();
+            users.put(user, words);
+          }
+
+          for (String word : PATTERN.split(record.value()))
+          {
+            Long num = words.get(word);
+            if (num == null)
+            {
+              num = 1l;
+            }
+            else
+            {
+              num++;
+            }
+            words.put(word, num);
+          }
         }
 
         if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
@@ -212,7 +218,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     }
   }
 
-  public Map<Integer, Map<String, Long>> getSeen()
+  public Map<Integer, Map<String, Map<String, Long>>> getSeen()
   {
     return seen;
   }