1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import java.util.HashMap;
8 import java.util.regex.Pattern;
12 public class WordcountRecordHandler implements RecordHandler<String, String>
14 final static Pattern PATTERN = Pattern.compile("\\W+");
17 private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
21 public void accept(ConsumerRecord<String, String> record)
23 Integer partition = record.partition();
24 String user = record.key();
25 Map<String, Map<String, Long>> users = seen.get(partition);
27 Map<String, Long> words = users.get(user);
30 words = new HashMap<>();
31 users.put(user, words);
34 for (String word : PATTERN.split(record.value()))
36 Long num = words.get(word);
49 public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
51 seen.put(partition, statistics);
54 public Map<String, Map<String, Long>> removePartition(Integer partition)
56 return seen.remove(partition);
60 public Map<Integer, Map<String, Map<String, Long>>> getSeen()