83b3ff2cd584907548bab7f137e43bf2ac610e4a
[demos/kafka/training] / src / main / java / de / juplo / kafka / KeyCountingRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5
6 import java.util.HashMap;
7 import java.util.Map;
8
9
10 @Slf4j
11 public class KeyCountingRecordHandler implements RecordHandler<String, Long>
12 {
13   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
14
15
16   @Override
17   public void accept(ConsumerRecord<String, Long> record)
18   {
19     Integer partition = record.partition();
20     String key = record.key() == null ? "NULL" : record.key().toString();
21
22     if (!seen.containsKey(partition))
23       seen.put(partition, new HashMap<>());
24
25     Map<String, Long> byKey = seen.get(partition);
26
27     if (!byKey.containsKey(key))
28       byKey.put(key, 0l);
29
30     long seenByKey = byKey.get(key);
31     seenByKey++;
32     byKey.put(key, seenByKey);
33   }
34
35
36   public Map<Integer, Map<String, Long>> getSeen()
37   {
38     return seen;
39   }
40 }