Fixes für Setup/README.sh aus 'deserialization' in 'stored-offsets' gemerged
[demos/kafka/training] / src / main / java / de / juplo / kafka / KeyCountingRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5
6 import java.util.HashMap;
7 import java.util.Map;
8
9
10 @Slf4j
11 public class KeyCountingRecordHandler implements RecordHandler<String, Long>
12 {
13   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
14
15
16   @Override
17   public void accept(ConsumerRecord<String, Long> record)
18   {
19     Integer partition = record.partition();
20     String key = record.key() == null ? "NULL" : record.key().toString();
21     Map<String, Long> byKey = seen.get(partition);
22
23     if (!byKey.containsKey(key))
24       byKey.put(key, 0l);
25
26     long seenByKey = byKey.get(key);
27     seenByKey++;
28     byKey.put(key, seenByKey);
29   }
30
31   public void addPartition(Integer partition, Map<String, Long> statistics)
32   {
33     seen.put(partition, statistics);
34   }
35
36   public Map<String, Long> removePartition(Integer partition)
37   {
38     return seen.remove(partition);
39   }
40
41
42   public Map<Integer, Map<String, Long>> getSeen()
43   {
44     return seen;
45   }
46 }