X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=28291579fd85c7efeb885344bce3e9c76fc13db1;hb=ce92719ebd18b3c34a3aa8ca60cf67f6c3fbd8b2;hp=829ab0e0cc49b1f5aef5155f394b3902b6a9531e;hpb=813117d66c623509619dd543f353a41960cc48db;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 829ab0e..2829157 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -21,6 +21,24 @@ public class ApplicationRecordHandler implements RecordHandler private final Map state = new HashMap<>(); + public void addNumber( + Integer partition, + String user, + MessageAddNumber message) + { + state.get(partition).addToSum(user, message.getNext()); + } + + public void calculateSum( + Integer partition, + String user, + MessageCalculateSum message) + { + AdderResult result = state.get(partition).calculate(user); + log.info("{} - New result for {}: {}", id, user, result); + results.addResults(partition, user, result); + } + @Override public void accept(ConsumerRecord record) { @@ -31,14 +49,11 @@ public class ApplicationRecordHandler implements RecordHandler switch(message.getType()) { case ADD: - MessageAddNumber addNumber = (MessageAddNumber)message; - state.get(partition).addToSum(user, addNumber.getNext()); + addNumber(partition, user, (MessageAddNumber) message); break; case CALC: - AdderResult result = state.get(partition).calculate(user); - log.info("{} - New result for {}: {}", id, user, result); - results.addResults(partition, user, result); + calculateSum(partition, user, (MessageCalculateSum) message); break; }