c2c26573546e2e663ba86fff31be9630ebbbe0db
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5
6 import java.util.HashMap;
7 import java.util.Map;
8
9
10 @Slf4j
11 public class ApplicationRecordHandler implements RecordHandler<String, Long>
12 {
13   private final Map<Integer, Map<String, Long>> state = new HashMap<>();
14
15
16   @Override
17   public void accept(ConsumerRecord<String, Long> record)
18   {
19     Integer partition = record.partition();
20     String key = record.key() == null ? "NULL" : record.key().toString();
21     Map<String, Long> byKey = state.get(partition);
22
23     if (!byKey.containsKey(key))
24       byKey.put(key, 0l);
25
26     long seenByKey = byKey.get(key);
27     seenByKey++;
28     byKey.put(key, seenByKey);
29   }
30
31   protected void addPartition(Integer partition, Map<String, Long> state)
32   {
33     this.state.put(partition, state);
34   }
35
36   protected Map<String, Long> removePartition(Integer partition)
37   {
38     return this.state.remove(partition);
39   }
40
41
42   public Map<Integer, Map<String, Long>> getState()
43   {
44     return state;
45   }
46 }