X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumRecordHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumRecordHandler.java;h=82ada38855856f66d3381a8ff8ab135aea031fd6;hb=c808810e9e33afe33b29f7fd3921023ecd15483d;hp=0000000000000000000000000000000000000000;hpb=0b6377951bbeeb2443415bc2aa9c68988ee4c5af;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SumRecordHandler.java b/src/main/java/de/juplo/kafka/SumRecordHandler.java new file mode 100644 index 0000000..82ada38 --- /dev/null +++ b/src/main/java/de/juplo/kafka/SumRecordHandler.java @@ -0,0 +1,64 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + + +@Slf4j +public class SumRecordHandler implements RecordHandler +{ + final static Pattern PATTERN = Pattern.compile("\\W+"); + + + private final Map>> seen = new HashMap<>(); + + + @Override + public void accept(ConsumerRecord record) + { + Integer partition = record.partition(); + String user = record.key(); + Map> users = seen.get(partition); + + Map words = users.get(user); + if (words == null) + { + words = new HashMap<>(); + users.put(user, words); + } + + for (String word : PATTERN.split(record.value())) + { + Long num = words.get(word); + if (num == null) + { + num = 1l; + } + else + { + num++; + } + words.put(word, num); + } + } + + public void addPartition(Integer partition, Map> statistics) + { + seen.put(partition, statistics); + } + + public Map> removePartition(Integer partition) + { + return seen.remove(partition); + } + + + public Map>> getSeen() + { + return seen; + } +}