GRÜN: Erwartungen implementiert
[demos/kafka/training] / src / main / java / de / juplo / kafka / WordcountRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5
6 import java.util.HashMap;
7 import java.util.Map;
8 import java.util.regex.Pattern;
9
10
11 @Slf4j
12 public class WordcountRecordHandler implements RecordHandler<String, String>
13 {
14   final static Pattern PATTERN = Pattern.compile("\\W+");
15
16
17   private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
18
19
20   @Override
21   public void accept(ConsumerRecord<String, String> record)
22   {
23     Integer partition = record.partition();
24     String user = record.key();
25     Map<String, Map<String, Long>> users = seen.get(partition);
26
27     Map<String, Long> words = users.get(user);
28     if (words == null)
29     {
30       words = new HashMap<>();
31       users.put(user, words);
32     }
33
34     for (String word : PATTERN.split(record.value()))
35     {
36       Long num = words.get(word);
37       if (num == null)
38       {
39         num = 1l;
40       }
41       else
42       {
43         num++;
44       }
45       words.put(word, num);
46     }
47   }
48
49   public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
50   {
51     seen.put(partition, statistics);
52   }
53
54   public Map<String, Map<String, Long>> removePartition(Integer partition)
55   {
56     return seen.remove(partition);
57   }
58
59
60   public Map<Integer, Map<String, Map<String, Long>>> getSeen()
61   {
62     return seen;
63   }
64 }