--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+
+@Slf4j
+public class SumRecordHandler implements RecordHandler<String, String>
+{
+ final static Pattern PATTERN = Pattern.compile("\\W+");
+
+
+ private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
+
+
+ @Override
+ public void accept(ConsumerRecord<String, String> record)
+ {
+ Integer partition = record.partition();
+ String user = record.key();
+ Map<String, Map<String, Long>> users = seen.get(partition);
+
+ Map<String, Long> words = users.get(user);
+ if (words == null)
+ {
+ words = new HashMap<>();
+ users.put(user, words);
+ }
+
+ for (String word : PATTERN.split(record.value()))
+ {
+ Long num = words.get(word);
+ if (num == null)
+ {
+ num = 1l;
+ }
+ else
+ {
+ num++;
+ }
+ words.put(word, num);
+ }
+ }
+
+ public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
+ {
+ seen.put(partition, statistics);
+ }
+
+ public Map<String, Map<String, Long>> removePartition(Integer partition)
+ {
+ return seen.remove(partition);
+ }
+
+
+ public Map<Integer, Map<String, Map<String, Long>>> getSeen()
+ {
+ return seen;
+ }
+}