X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumRecordHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumRecordHandler.java;h=d4ec38faf0d0c733076bff6db4a3d597099e883a;hb=5f35354fd694f78599d66ee9e01fb4c0d89cc5bb;hp=82ada38855856f66d3381a8ff8ab135aea031fd6;hpb=c808810e9e33afe33b29f7fd3921023ecd15483d;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SumRecordHandler.java b/src/main/java/de/juplo/kafka/SumRecordHandler.java index 82ada38..d4ec38f 100644 --- a/src/main/java/de/juplo/kafka/SumRecordHandler.java +++ b/src/main/java/de/juplo/kafka/SumRecordHandler.java @@ -4,17 +4,14 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.regex.Pattern; - @Slf4j public class SumRecordHandler implements RecordHandler { - final static Pattern PATTERN = Pattern.compile("\\W+"); - - - private final Map>> seen = new HashMap<>(); + private final Map>> seen = new HashMap<>(); + private final Map state = new HashMap<>(); @Override @@ -22,42 +19,40 @@ public class SumRecordHandler implements RecordHandler { Integer partition = record.partition(); String user = record.key(); - Map> users = seen.get(partition); - - Map words = users.get(user); - if (words == null) - { - words = new HashMap<>(); - users.put(user, words); - } - - for (String word : PATTERN.split(record.value())) + String message = record.value(); + switch (message) { - Long num = words.get(word); - if (num == null) - { - num = 1l; - } - else - { - num++; - } - words.put(word, num); + case "START": + state.get(partition).startSum(user); + return; + + case "END": + Long result = state.get(partition).endSum(user); + log.info("New result for {}: {}", user, result); + return; + + default: + state.get(partition).addToSum(user, Integer.parseInt(message)); + return; } } - public void addPartition(Integer partition, Map> statistics) + protected void addPartition(Integer partition, StateDocument document) { - seen.put(partition, statistics); + this.seen.put(partition, document.seen); + this.state.put(partition, new SumBusinessLogic(document.state)); } - public Map> removePartition(Integer partition) + protected StateDocument removePartition(Integer partition) { - return seen.remove(partition); + return new StateDocument( + partition, + this.state.remove(partition).getState(), + this.seen.remove(partition)); } - public Map>> getSeen() + public Map>> getSeen() { return seen; }