Die Ergebnisse werden gespeichert und sind via REST abrufbar
authorKai Moritz <kai@juplo.de>
Mon, 15 Aug 2022 20:15:17 +0000 (22:15 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 15 Aug 2022 20:15:17 +0000 (22:15 +0200)
src/main/java/de/juplo/kafka/AdderResults.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/StateDocument.java

diff --git a/src/main/java/de/juplo/kafka/AdderResults.java b/src/main/java/de/juplo/kafka/AdderResults.java
new file mode 100644 (file)
index 0000000..e7f5602
--- /dev/null
@@ -0,0 +1,47 @@
+package de.juplo.kafka;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+public class AdderResults
+{
+  private final Map<Integer, Map<String, List<AdderResult>>> results = new HashMap<>();
+
+
+  public void addResults(Integer partition, String user, AdderResult result)
+  {
+    Map<String, List<AdderResult>> resultsByUser = this.results.get(partition);
+
+    List<AdderResult> results = resultsByUser.get(user);
+    if (results == null)
+    {
+      results = new LinkedList<>();
+      resultsByUser.put(user, results);
+    }
+
+    results.add(result);
+  }
+
+  protected void addPartition(Integer partition, Map<String, List<AdderResult>> results)
+  {
+    this.results.put(partition, results);
+  }
+
+  protected Map<String, List<AdderResult>> removePartition(Integer partition)
+  {
+    return this.results.remove(partition);
+  }
+
+  public Map<Integer, Map<String, List<AdderResult>>> getState()
+  {
+    return results;
+  }
+
+  public Map<String, List<AdderResult>> getState(Integer partition)
+  {
+    return results.get(partition);
+  }
+}
index 9f54083..f83661e 100644 (file)
@@ -18,20 +18,28 @@ import java.util.concurrent.Executors;
 public class ApplicationConfiguration
 {
   @Bean
-  public ApplicationRecordHandler recordHandler()
+  public ApplicationRecordHandler recordHandler(AdderResults adderResults)
   {
-    return new ApplicationRecordHandler();
+    return new ApplicationRecordHandler(adderResults);
+  }
+
+  @Bean
+  public AdderResults adderResults()
+  {
+    return new AdderResults();
   }
 
   @Bean
   public ApplicationRebalanceListener rebalanceListener(
       ApplicationRecordHandler recordHandler,
+      AdderResults adderResults,
       StateRepository stateRepository,
       Consumer<String, String> consumer,
       ApplicationProperties properties)
   {
     return new ApplicationRebalanceListener(
         recordHandler,
+        adderResults,
         stateRepository,
         properties.getClientId(),
         properties.getTopic(),
index 5a01393..cd9da64 100644 (file)
@@ -8,8 +8,7 @@ import org.apache.kafka.common.TopicPartition;
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.Collection;
-import java.util.Map;
+import java.util.*;
 
 
 @RequiredArgsConstructor
@@ -17,6 +16,7 @@ import java.util.Map;
 public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
 {
   private final ApplicationRecordHandler recordHandler;
+  private final AdderResults adderResults;
   private final StateRepository stateRepository;
   private final String id;
   private final String topic;
@@ -24,6 +24,8 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
   private final Duration commitInterval;
   private final Consumer<String, String> consumer;
 
+  private final Set<Integer> partitions = new HashSet<>();
+
   private Instant lastCommit = Instant.EPOCH;
   private boolean commitsEnabled = true;
 
@@ -33,6 +35,7 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
     partitions.forEach(tp ->
     {
       Integer partition = tp.partition();
+      this.partitions.add(partition);
       StateDocument document =
           stateRepository
               .findById(Integer.toString(partition))
@@ -45,6 +48,7 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
         consumer.seek(tp, document.offset);
       }
       recordHandler.addPartition(partition, document.state);
+      adderResults.addPartition(partition, document.results);
     });
   }
 
@@ -54,6 +58,7 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
     partitions.forEach(tp ->
     {
       Integer partition = tp.partition();
+      this.partitions.remove(partition);
       Long offset = consumer.position(tp);
       log.info(
           "{} - removing partition: {}, offset of next message {})",
@@ -62,8 +67,9 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
           offset);
       if (commitsEnabled)
       {
-        Map<String, AdderResult> removed = recordHandler.removePartition(partition);
-        stateRepository.save(new StateDocument(partition, removed, offset));
+        Map<String, AdderResult> state = recordHandler.removePartition(partition);
+        Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
+        stateRepository.save(new StateDocument(partition, state, results, offset));
       }
       else
       {
@@ -85,11 +91,12 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
     {
       log.debug("Storing data and offsets, last commit: {}", lastCommit);
-      recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
+      partitions.forEach(partition -> stateRepository.save(
           new StateDocument(
-              partiton,
-              adder.getState(),
-              consumer.position(new TopicPartition(topic, partiton)))));
+              partition,
+              recordHandler.getState(partition).getState(),
+              adderResults.getState(partition),
+              consumer.position(new TopicPartition(topic, partition)))));
       lastCommit = clock.instant();
     }
   }
index 93e1297..596f3da 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka;
 
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
@@ -7,9 +8,12 @@ import java.util.HashMap;
 import java.util.Map;
 
 
+@RequiredArgsConstructor
 @Slf4j
 public class ApplicationRecordHandler implements RecordHandler<String, String>
 {
+  private final AdderResults results;
+
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
 
@@ -24,6 +28,7 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
     {
       AdderResult result = state.get(partition).calculate(user);
       log.info("New result for {}: {}", user, result);
+      results.addResults(partition, user, result);
       return;
     }
 
@@ -45,4 +50,9 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
   {
     return state;
   }
+
+  public AdderBusinessLogic getState(Integer partition)
+  {
+    return state.get(partition);
+  }
 }
index 63f015d..26a5bc8 100644 (file)
@@ -5,6 +5,7 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -17,6 +18,7 @@ public class DriverController
 {
   private final EndlessConsumer consumer;
   private final ApplicationRecordHandler recordHandler;
+  private final AdderResults results;
 
 
   @PostMapping("start")
@@ -46,7 +48,7 @@ public class DriverController
   }
 
   @GetMapping("state/{user}")
-  public ResponseEntity<Long> seen(@PathVariable String user)
+  public ResponseEntity<Long> state(@PathVariable String user)
   {
     for (AdderBusinessLogic adder : recordHandler.getState().values())
     {
@@ -58,6 +60,25 @@ public class DriverController
     return ResponseEntity.notFound().build();
   }
 
+  @GetMapping("results")
+  public Map<Integer, Map<String, List<AdderResult>>> results()
+  {
+    return results.getState();
+  }
+
+  @GetMapping("results/{user}")
+  public ResponseEntity<List<AdderResult>> results(@PathVariable String user)
+  {
+    for (Map<String, List<AdderResult>> resultsByUser : this.results.getState().values())
+    {
+      List<AdderResult> results = resultsByUser.get(user);
+      if (results != null)
+        return ResponseEntity.ok(results);
+    }
+
+    return ResponseEntity.notFound().build();
+  }
+
 
   @ExceptionHandler
   @ResponseStatus(HttpStatus.BAD_REQUEST)
index 82306d0..c10a50c 100644 (file)
@@ -5,6 +5,7 @@ import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 
@@ -16,6 +17,7 @@ public class StateDocument
   public String id;
   public long offset = -1l;
   public Map<String, AdderResult> state;
+  public Map<String, List<AdderResult>> results;
 
   public StateDocument()
   {
@@ -25,15 +27,18 @@ public class StateDocument
   {
     this.id = Integer.toString(partition);
     this.state = new HashMap<>();
+    this.results = new HashMap<>();
   }
 
   public StateDocument(
       Integer partition,
       Map<String, AdderResult> state,
+      Map<String, List<AdderResult>> results,
       long offset)
   {
     this.id = Integer.toString(partition);
     this.state = state;
+    this.results = results;
     this.offset = offset;
   }
 }