X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumRecordHandler.java;h=b0fd27bfde983a83c94ac52fca894289331e7c98;hb=43be1c56d6205a70bc2740ef105f329d5e7461cc;hp=82ada38855856f66d3381a8ff8ab135aea031fd6;hpb=c808810e9e33afe33b29f7fd3921023ecd15483d;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SumRecordHandler.java b/src/main/java/de/juplo/kafka/SumRecordHandler.java index 82ada38..b0fd27b 100644 --- a/src/main/java/de/juplo/kafka/SumRecordHandler.java +++ b/src/main/java/de/juplo/kafka/SumRecordHandler.java @@ -4,17 +4,15 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.regex.Pattern; +import java.util.stream.Collectors; @Slf4j public class SumRecordHandler implements RecordHandler { - final static Pattern PATTERN = Pattern.compile("\\W+"); - - - private final Map>> seen = new HashMap<>(); + private final Map state = new HashMap<>(); @Override @@ -22,43 +20,37 @@ public class SumRecordHandler implements RecordHandler { Integer partition = record.partition(); String user = record.key(); - Map> users = seen.get(partition); - - Map words = users.get(user); - if (words == null) - { - words = new HashMap<>(); - users.put(user, words); - } - - for (String word : PATTERN.split(record.value())) + String message = record.value(); + switch (message) { - Long num = words.get(word); - if (num == null) - { - num = 1l; - } - else - { - num++; - } - words.put(word, num); + case "START": + state.get(partition).startSum(user); + break; + + case "END": + Long result = state.get(partition).endSum(user); + log.info("New result for {}: {}", user, result); + break; + + default: + state.get(partition).addToSum(user, Integer.parseInt(message)); + break; } } - public void addPartition(Integer partition, Map> statistics) + protected void addPartition(Integer partition, Map state) { - seen.put(partition, statistics); + this.state.put(partition, new SumBusinessLogic(state)); } - public Map> removePartition(Integer partition) + protected Map removePartition(Integer partition) { - return seen.remove(partition); + return this.state.remove(partition).getState(); } - public Map>> getSeen() + public Map getState() { - return seen; + return state; } }