Benennung vereinheitlicht und projektunabhängig gemacht
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
index 3492c0d..dfbf82e 100644 (file)
@@ -18,10 +18,6 @@ public class ApplicationRecordHandler implements RecordHandler<String, Long>
   {
     Integer partition = record.partition();
     String key = record.key() == null ? "NULL" : record.key().toString();
-
-    if (!state.containsKey(partition))
-      state.put(partition, new HashMap<>());
-
     Map<String, Long> byKey = state.get(partition);
 
     if (!byKey.containsKey(key))
@@ -32,6 +28,16 @@ public class ApplicationRecordHandler implements RecordHandler<String, Long>
     byKey.put(key, seenByKey);
   }
 
+  public void addPartition(Integer partition, Map<String, Long> statistics)
+  {
+    state.put(partition, statistics);
+  }
+
+  public Map<String, Long> removePartition(Integer partition)
+  {
+    return state.remove(partition);
+  }
+
 
   public Map<Integer, Map<String, Long>> getState()
   {