Verbesserte Tests und Korrekturen gemerged: sumup-adder -> stored-offsets
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
index d0d385c..c2c2657 100644 (file)
@@ -8,46 +8,38 @@ import java.util.Map;
 
 
 @Slf4j
-public class ApplicationRecordHandler implements RecordHandler<String, String>
+public class ApplicationRecordHandler implements RecordHandler<String, Long>
 {
-  private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+  private final Map<Integer, Map<String, Long>> state = new HashMap<>();
 
 
   @Override
-  public void accept(ConsumerRecord<String, String> record)
+  public void accept(ConsumerRecord<String, Long> record)
   {
     Integer partition = record.partition();
-    String user = record.key();
-    String message = record.value();
-    switch (message)
-    {
-      case "START":
-        state.get(partition).startSum(user);
-        break;
-
-      case "END":
-        Long result = state.get(partition).endSum(user);
-        log.info("New result for {}: {}", user, result);
-        break;
-
-      default:
-        state.get(partition).addToSum(user, Integer.parseInt(message));
-        break;
-    }
+    String key = record.key() == null ? "NULL" : record.key().toString();
+    Map<String, Long> byKey = state.get(partition);
+
+    if (!byKey.containsKey(key))
+      byKey.put(key, 0l);
+
+    long seenByKey = byKey.get(key);
+    seenByKey++;
+    byKey.put(key, seenByKey);
   }
 
   protected void addPartition(Integer partition, Map<String, Long> state)
   {
-    this.state.put(partition, new AdderBusinessLogic(state));
+    this.state.put(partition, state);
   }
 
   protected Map<String, Long> removePartition(Integer partition)
   {
-    return this.state.remove(partition).getState();
+    return this.state.remove(partition);
   }
 
 
-  public Map<Integer, AdderBusinessLogic> getState()
+  public Map<Integer, Map<String, Long>> getState()
   {
     return state;
   }