X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumRecordHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumRecordHandler.java;h=b0fd27bfde983a83c94ac52fca894289331e7c98;hb=3495017ed2116f338c3342a313abdb7170683573;hp=d4ec38faf0d0c733076bff6db4a3d597099e883a;hpb=5f35354fd694f78599d66ee9e01fb4c0d89cc5bb;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SumRecordHandler.java b/src/main/java/de/juplo/kafka/SumRecordHandler.java index d4ec38f..b0fd27b 100644 --- a/src/main/java/de/juplo/kafka/SumRecordHandler.java +++ b/src/main/java/de/juplo/kafka/SumRecordHandler.java @@ -6,11 +6,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + @Slf4j public class SumRecordHandler implements RecordHandler { - private final Map>> seen = new HashMap<>(); private final Map state = new HashMap<>(); @@ -24,36 +25,32 @@ public class SumRecordHandler implements RecordHandler { case "START": state.get(partition).startSum(user); - return; + break; case "END": Long result = state.get(partition).endSum(user); log.info("New result for {}: {}", user, result); - return; + break; default: state.get(partition).addToSum(user, Integer.parseInt(message)); - return; + break; } } - protected void addPartition(Integer partition, StateDocument document) + protected void addPartition(Integer partition, Map state) { - this.seen.put(partition, document.seen); - this.state.put(partition, new SumBusinessLogic(document.state)); + this.state.put(partition, new SumBusinessLogic(state)); } - protected StateDocument removePartition(Integer partition) + protected Map removePartition(Integer partition) { - return new StateDocument( - partition, - this.state.remove(partition).getState(), - this.seen.remove(partition)); + return this.state.remove(partition).getState(); } - public Map>> getSeen() + public Map getState() { - return seen; + return state; } }