WIP
[demos/kafka/training] / src / main / java / de / juplo / kafka / SumRecordHandler.java
index d4ec38f..b0fd27b 100644 (file)
@@ -6,11 +6,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
 
 @Slf4j
 public class SumRecordHandler implements RecordHandler<String, String>
 {
-  private final Map<Integer, Map<String, List<Long>>> seen = new HashMap<>();
   private final Map<Integer, SumBusinessLogic> state = new HashMap<>();
 
 
@@ -24,36 +25,32 @@ public class SumRecordHandler implements RecordHandler<String, String>
     {
       case "START":
         state.get(partition).startSum(user);
-        return;
+        break;
 
       case "END":
         Long result = state.get(partition).endSum(user);
         log.info("New result for {}: {}", user, result);
-        return;
+        break;
 
       default:
         state.get(partition).addToSum(user, Integer.parseInt(message));
-        return;
+        break;
     }
   }
 
-  protected void addPartition(Integer partition, StateDocument document)
+  protected void addPartition(Integer partition, Map<String, Long> state)
   {
-    this.seen.put(partition, document.seen);
-    this.state.put(partition, new SumBusinessLogic(document.state));
+    this.state.put(partition, new SumBusinessLogic(state));
   }
 
-  protected StateDocument removePartition(Integer partition)
+  protected Map<String, Long> removePartition(Integer partition)
   {
-    return new StateDocument(
-        partition,
-        this.state.remove(partition).getState(),
-        this.seen.remove(partition));
+    return this.state.remove(partition).getState();
   }
 
 
-  public Map<Integer, Map<String, List<Long>>> getSeen()
+  public Map<Integer, SumBusinessLogic> getState()
   {
-    return seen;
+    return state;
   }
 }