X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=829ab0e0cc49b1f5aef5155f394b3902b6a9531e;hb=7c8f7f58d35e49324f0b2b87c0bdaafcb5b7fcc0;hp=51d524fc682beb6a8998075a55bf037c7c25ee51;hpb=21d88d8ede95d1f811ff91d3804cba6d95ae6aab;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 51d524f..829ab0e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -12,7 +12,7 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class ApplicationRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; private final Optional throttle; @@ -22,21 +22,24 @@ public class ApplicationRecordHandler implements RecordHandler @Override - public void accept(ConsumerRecord record) + public void accept(ConsumerRecord record) { Integer partition = record.partition(); String user = record.key(); - String message = record.value(); + Message message = record.value(); - if (message.equals("CALCULATE")) + switch(message.getType()) { - AdderResult result = state.get(partition).calculate(user); - log.info("{} - New result for {}: {}", id, user, result); - results.addResults(partition, user, result); - } - else - { - state.get(partition).addToSum(user, Integer.parseInt(message)); + case ADD: + MessageAddNumber addNumber = (MessageAddNumber)message; + state.get(partition).addToSum(user, addNumber.getNext()); + break; + + case CALC: + AdderResult result = state.get(partition).calculate(user); + log.info("{} - New result for {}: {}", id, user, result); + results.addResults(partition, user, result); + break; } if (throttle.isPresent())