+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Slf4j
-public class KeyCountingRecordHandler implements RecordHandler<String, Long>
-{
- private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-
-
- @Override
- public void accept(ConsumerRecord<String, Long> record)
- {
- Integer partition = record.partition();
- String key = record.key() == null ? "NULL" : record.key().toString();
-
- if (!seen.containsKey(partition))
- seen.put(partition, new HashMap<>());
-
- Map<String, Long> byKey = seen.get(partition);
-
- if (!byKey.containsKey(key))
- byKey.put(key, 0l);
-
- long seenByKey = byKey.get(key);
- seenByKey++;
- byKey.put(key, seenByKey);
- }
-
-
- public Map<Integer, Map<String, Long>> getSeen()
- {
- return seen;
- }
-}