Verbesserungen aus 'rebalance-listener' nach 'counting-consumer' gemerged
[demos/kafka/training] / src / main / java / de / juplo / kafka / KeyCountingRecordHandler.java
index 099dcf7..83b3ff2 100644 (file)
@@ -18,6 +18,10 @@ public class KeyCountingRecordHandler implements RecordHandler<String, Long>
   {
     Integer partition = record.partition();
     String key = record.key() == null ? "NULL" : record.key().toString();
+
+    if (!seen.containsKey(partition))
+      seen.put(partition, new HashMap<>());
+
     Map<String, Long> byKey = seen.get(partition);
 
     if (!byKey.containsKey(key))
@@ -28,16 +32,6 @@ public class KeyCountingRecordHandler implements RecordHandler<String, Long>
     byKey.put(key, seenByKey);
   }
 
-  public void addPartition(Integer partition, Map<String, Long> statistics)
-  {
-    seen.put(partition, statistics);
-  }
-
-  public Map<String, Long> removePartition(Integer partition)
-  {
-    return seen.remove(partition);
-  }
-
 
   public Map<Integer, Map<String, Long>> getSeen()
   {