X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=ce340a76a96eaec73853fb42d67fd60e7a62e665;hb=d576eea9bc9208d9e5003bd8c8c132bed96b5c40;hp=d0d385ca9f3ed0f94da4a5af058f1da0adc8a19e;hpb=0cd07d4498de934aefece33e20eee0df684e62e5;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index d0d385c..ce340a7 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -19,21 +19,15 @@ public class ApplicationRecordHandler implements RecordHandler Integer partition = record.partition(); String user = record.key(); String message = record.value(); - switch (message) + + if (message.equals("CALCULATE")) { - case "START": - state.get(partition).startSum(user); - break; - - case "END": - Long result = state.get(partition).endSum(user); - log.info("New result for {}: {}", user, result); - break; - - default: - state.get(partition).addToSum(user, Integer.parseInt(message)); - break; + Long result = state.get(partition).calculate(user); + log.info("New result for {}: {}", user, result); + return; } + + state.get(partition).addToSum(user, Integer.parseInt(message)); } protected void addPartition(Integer partition, Map state)