X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=dfbf82ea6c70293ea259ef54f5250bfa4b105b48;hb=refs%2Fheads%2Frebalance-listener;hp=3492c0d4015acaee3bdedae05d939b8258ee8449;hpb=61581ed5dfbb70f66390e7c3e9c261c6e6aa74d4;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 3492c0d..dfbf82e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -18,10 +18,6 @@ public class ApplicationRecordHandler implements RecordHandler { Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key().toString(); - - if (!state.containsKey(partition)) - state.put(partition, new HashMap<>()); - Map byKey = state.get(partition); if (!byKey.containsKey(key)) @@ -32,6 +28,16 @@ public class ApplicationRecordHandler implements RecordHandler byKey.put(key, seenByKey); } + public void addPartition(Integer partition, Map statistics) + { + state.put(partition, statistics); + } + + public Map removePartition(Integer partition) + { + return state.remove(partition); + } + public Map> getState() {