Verbesserungen und Fachlogik-Test aus 'sumup-adder' gemerged
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
index d0d385c..596f3da 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka;
 
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
@@ -7,9 +8,12 @@ import java.util.HashMap;
 import java.util.Map;
 
 
+@RequiredArgsConstructor
 @Slf4j
 public class ApplicationRecordHandler implements RecordHandler<String, String>
 {
+  private final AdderResults results;
+
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
 
@@ -19,29 +23,24 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
     Integer partition = record.partition();
     String user = record.key();
     String message = record.value();
-    switch (message)
+
+    if (message.equals("CALCULATE"))
     {
-      case "START":
-        state.get(partition).startSum(user);
-        break;
-
-      case "END":
-        Long result = state.get(partition).endSum(user);
-        log.info("New result for {}: {}", user, result);
-        break;
-
-      default:
-        state.get(partition).addToSum(user, Integer.parseInt(message));
-        break;
+      AdderResult result = state.get(partition).calculate(user);
+      log.info("New result for {}: {}", user, result);
+      results.addResults(partition, user, result);
+      return;
     }
+
+    state.get(partition).addToSum(user, Integer.parseInt(message));
   }
 
-  protected void addPartition(Integer partition, Map<String, Long> state)
+  protected void addPartition(Integer partition, Map<String, AdderResult> state)
   {
     this.state.put(partition, new AdderBusinessLogic(state));
   }
 
-  protected Map<String, Long> removePartition(Integer partition)
+  protected Map<String, AdderResult> removePartition(Integer partition)
   {
     return this.state.remove(partition).getState();
   }
@@ -51,4 +50,9 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
   {
     return state;
   }
+
+  public AdderBusinessLogic getState(Integer partition)
+  {
+    return state.get(partition);
+  }
 }