Benennung vereinheitlicht und projektunabhängig gemacht
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5
6 import java.util.HashMap;
7 import java.util.Map;
8
9
10 @Slf4j
11 public class ApplicationRecordHandler implements RecordHandler<String, Long>
12 {
13   private final Map<Integer, Map<String, Long>> state = new HashMap<>();
14
15
16   @Override
17   public void accept(ConsumerRecord<String, Long> record)
18   {
19     Integer partition = record.partition();
20     String key = record.key() == null ? "NULL" : record.key().toString();
21
22     if (!state.containsKey(partition))
23       state.put(partition, new HashMap<>());
24
25     Map<String, Long> byKey = state.get(partition);
26
27     if (!byKey.containsKey(key))
28       byKey.put(key, 0l);
29
30     long seenByKey = byKey.get(key);
31     seenByKey++;
32     byKey.put(key, seenByKey);
33   }
34
35
36   public Map<Integer, Map<String, Long>> getState()
37   {
38     return state;
39   }
40 }