X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FWordcountRecordHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FWordcountRecordHandler.java;h=0000000000000000000000000000000000000000;hb=5b4b7acf7b6a02e0e5c779257d3f5996366625e6;hp=4efc54726539c61c017d336ae07957c1b84fdefb;hpb=f83599b6aaefff62c286e2143bb2e8a81751e6fd;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java deleted file mode 100644 index 4efc547..0000000 --- a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java +++ /dev/null @@ -1,64 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.HashMap; -import java.util.Map; -import java.util.regex.Pattern; - - -@Slf4j -public class WordcountRecordHandler implements RecordHandler -{ - final static Pattern PATTERN = Pattern.compile("\\W+"); - - - private final Map>> seen = new HashMap<>(); - - - @Override - public void accept(ConsumerRecord record) - { - Integer partition = record.partition(); - String user = record.key(); - Map> users = seen.get(partition); - - Map words = users.get(user); - if (words == null) - { - words = new HashMap<>(); - users.put(user, words); - } - - for (String word : PATTERN.split(record.value())) - { - Long num = words.get(word); - if (num == null) - { - num = 1l; - } - else - { - num++; - } - words.put(word, num); - } - } - - public void addPartition(Integer partition, Map> statistics) - { - seen.put(partition, statistics); - } - - public Map> removePartition(Integer partition) - { - return seen.remove(partition); - } - - - public Map>> getSeen() - { - return seen; - } -}