X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FKeyCountingRecordHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FKeyCountingRecordHandler.java;h=0000000000000000000000000000000000000000;hb=61581ed5dfbb70f66390e7c3e9c261c6e6aa74d4;hp=83b3ff2cd584907548bab7f137e43bf2ac610e4a;hpb=487ca1ac0c73b1c07547d5316d145ed46490ba9d;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java deleted file mode 100644 index 83b3ff2..0000000 --- a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java +++ /dev/null @@ -1,40 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.HashMap; -import java.util.Map; - - -@Slf4j -public class KeyCountingRecordHandler implements RecordHandler -{ - private final Map> seen = new HashMap<>(); - - - @Override - public void accept(ConsumerRecord record) - { - Integer partition = record.partition(); - String key = record.key() == null ? "NULL" : record.key().toString(); - - if (!seen.containsKey(partition)) - seen.put(partition, new HashMap<>()); - - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0l); - - long seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); - } - - - public Map> getSeen() - { - return seen; - } -}