WIP
authorKai Moritz <kai@juplo.de>
Sat, 13 Aug 2022 14:53:54 +0000 (16:53 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 13 Aug 2022 14:53:54 +0000 (16:53 +0200)
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java
src/main/java/de/juplo/kafka/StateDocument.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/StatisticsDocument.java [deleted file]
src/main/java/de/juplo/kafka/SumBusinessLogic.java
src/main/java/de/juplo/kafka/SumRebalanceListener.java
src/main/java/de/juplo/kafka/SumRecordHandler.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 5a09c1b..fdae76f 100644 (file)
@@ -5,6 +5,7 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
@@ -14,7 +15,7 @@ import java.util.concurrent.ExecutionException;
 public class DriverController
 {
   private final EndlessConsumer consumer;
-  private final SumRecordHandler wordcount;
+  private final SumRecordHandler sumRecordHandler;
 
 
   @PostMapping("start")
@@ -31,19 +32,19 @@ public class DriverController
 
 
   @GetMapping("seen")
-  public Map<Integer, Map<String, Map<String, Long>>> seen()
+  public Map<Integer, Map<String, List<Long>>> seen()
   {
-    return wordcount.getSeen();
+    return sumRecordHandler.getSeen();
   }
 
   @GetMapping("seen/{user}")
-  public ResponseEntity<Map<String, Long>> seen(@PathVariable String user)
+  public ResponseEntity<List<Long>> seen(@PathVariable String user)
   {
-    for (Map<String, Map<String, Long>> users : wordcount.getSeen().values())
+    for (Map<String, List<Long>> users : sumRecordHandler.getSeen().values())
     {
-      Map<String, Long> words = users.get(user);
-      if (words != null)
-        return ResponseEntity.ok(words);
+      List<Long> results = users.get(user);
+      if (results != null)
+        return ResponseEntity.ok(results);
     }
 
     return ResponseEntity.notFound().build();
index 0ccf3cd..9e26410 100644 (file)
@@ -5,7 +5,7 @@ import org.springframework.data.mongodb.repository.MongoRepository;
 import java.util.Optional;
 
 
-public interface PartitionStatisticsRepository extends MongoRepository<StatisticsDocument, String>
+public interface PartitionStatisticsRepository extends MongoRepository<StateDocument, String>
 {
-  public Optional<StatisticsDocument> findById(String partition);
+  public Optional<StateDocument> findById(String partition);
 }
diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java
new file mode 100644 (file)
index 0000000..52968cd
--- /dev/null
@@ -0,0 +1,42 @@
+package de.juplo.kafka;
+
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+@Document(collection = "statistics")
+@ToString
+public class StateDocument
+{
+  @Id
+  public String id;
+  public long offset = -1l;
+  public Map<String, Long> state;
+  public Map<String, List<Long>> seen;
+
+  public StateDocument()
+  {
+  }
+
+  public StateDocument(Integer partition)
+  {
+    this.id = Integer.toString(partition);
+    this.state = new HashMap<>();
+    this.seen = new HashMap<>();
+  }
+
+  public StateDocument(
+      Integer partition,
+      Map<String, Long> state,
+      Map<String, List<Long>> seen)
+  {
+    this.id = Integer.toString(partition);
+    this.state = state;
+    this.seen = seen;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java
deleted file mode 100644 (file)
index 137c9bb..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.ToString;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Document(collection = "statistics")
-@ToString
-public class StatisticsDocument
-{
-  @Id
-  public String id;
-  public long offset = -1l;
-  public Map<String, Map<String, Long>> statistics;
-
-  public StatisticsDocument()
-  {
-  }
-
-  public StatisticsDocument(Integer partition)
-  {
-    this.id = Integer.toString(partition);
-    this.statistics = new HashMap<>();
-  }
-
-  public StatisticsDocument(Integer partition, Map<String, Map<String, Long>> statistics, long offset)
-  {
-    this.id = Integer.toString(partition);
-    this.statistics = statistics;
-    this.offset = offset;
-  }
-}
index 74696a4..27ddebb 100644 (file)
@@ -52,4 +52,9 @@ public class SumBusinessLogic
 
     return state.get(user);
   }
+
+  protected Map<String, Long> getState()
+  {
+    return state;
+  }
 }
index 1cd738f..be752ae 100644 (file)
@@ -9,7 +9,6 @@ import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collection;
-import java.util.Map;
 
 
 @RequiredArgsConstructor
@@ -34,17 +33,17 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL
       Integer partition = tp.partition();
       Long offset = consumer.position(tp);
       log.info("{} - adding partition: {}, offset={}", id, partition, offset);
-      StatisticsDocument document =
+      StateDocument document =
           repository
               .findById(Integer.toString(partition))
-              .orElse(new StatisticsDocument(partition));
+              .orElse(new StateDocument(partition));
       if (document.offset >= 0)
       {
         // Only seek, if a stored offset was found
         // Otherwise: Use initial offset, generated by Kafka
         consumer.seek(tp, document.offset);
       }
-      handler.addPartition(partition, document.statistics);
+      handler.addPartition(partition, document);
     });
   }
 
@@ -60,8 +59,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL
           id,
           partition,
           newOffset);
-      Map<String, Map<String, Long>> removed = handler.removePartition(partition);
-      repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+      repository.save(handler.removePartition(partition));
     });
   }
 
@@ -73,7 +71,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL
     {
       log.debug("Storing data and offsets, last commit: {}", lastCommit);
       handler.getSeen().forEach((partiton, statistics) -> repository.save(
-          new StatisticsDocument(
+          new StateDocument(
               partiton,
               statistics,
               consumer.position(new TopicPartition(topic, partiton)))));
index 82ada38..d4ec38f 100644 (file)
@@ -4,17 +4,14 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.regex.Pattern;
-
 
 @Slf4j
 public class SumRecordHandler implements RecordHandler<String, String>
 {
-  final static Pattern PATTERN = Pattern.compile("\\W+");
-
-
-  private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
+  private final Map<Integer, Map<String, List<Long>>> seen = new HashMap<>();
+  private final Map<Integer, SumBusinessLogic> state = new HashMap<>();
 
 
   @Override
@@ -22,42 +19,40 @@ public class SumRecordHandler implements RecordHandler<String, String>
   {
     Integer partition = record.partition();
     String user = record.key();
-    Map<String, Map<String, Long>> users = seen.get(partition);
-
-    Map<String, Long> words = users.get(user);
-    if (words == null)
-    {
-      words = new HashMap<>();
-      users.put(user, words);
-    }
-
-    for (String word : PATTERN.split(record.value()))
+    String message = record.value();
+    switch (message)
     {
-      Long num = words.get(word);
-      if (num == null)
-      {
-        num = 1l;
-      }
-      else
-      {
-        num++;
-      }
-      words.put(word, num);
+      case "START":
+        state.get(partition).startSum(user);
+        return;
+
+      case "END":
+        Long result = state.get(partition).endSum(user);
+        log.info("New result for {}: {}", user, result);
+        return;
+
+      default:
+        state.get(partition).addToSum(user, Integer.parseInt(message));
+        return;
     }
   }
 
-  public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
+  protected void addPartition(Integer partition, StateDocument document)
   {
-    seen.put(partition, statistics);
+    this.seen.put(partition, document.seen);
+    this.state.put(partition, new SumBusinessLogic(document.state));
   }
 
-  public Map<String, Map<String, Long>> removePartition(Integer partition)
+  protected StateDocument removePartition(Integer partition)
   {
-    return seen.remove(partition);
+    return new StateDocument(
+        partition,
+        this.state.remove(partition).getState(),
+        this.seen.remove(partition));
   }
 
 
-  public Map<Integer, Map<String, Map<String, Long>>> getSeen()
+  public Map<Integer, Map<String, List<Long>>> getSeen()
   {
     return seen;
   }
index 09614b8..f19bfb1 100644 (file)
@@ -156,10 +156,10 @@ class ApplicationTests
                        Long offset = offsetConsumer.position(tp);
                        log.info("New position for {}: {}", tp, offset);
                        Integer partition = tp.partition();
-                       StatisticsDocument document =
+                       StateDocument document =
                                        partitionStatisticsRepository
                                                        .findById(partition.toString())
-                                                       .orElse(new StatisticsDocument(partition));
+                                                       .orElse(new StateDocument(partition));
                        document.offset = offset;
                        partitionStatisticsRepository.save(document);
                });