X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=f4d3671ae6da4208a551271f3b19750213e07ce8;hb=0c9a0c1cf9a0065012743efcd940d8721bc33c20;hp=28291579fd85c7efeb885344bce3e9c76fc13db1;hpb=caed9441a9303af071a572405ae4a665d60faae7;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 2829157..f4d3671 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import java.time.Duration; import java.util.HashMap; @@ -12,7 +11,7 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class ApplicationRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; private final Optional throttle; @@ -21,42 +20,34 @@ public class ApplicationRecordHandler implements RecordHandler private final Map state = new HashMap<>(); + @Override public void addNumber( + String topic, Integer partition, + Long offset, String user, MessageAddNumber message) { state.get(partition).addToSum(user, message.getNext()); + throttle(); } + @Override public void calculateSum( + String topic, Integer partition, + Long offset, String user, MessageCalculateSum message) { AdderResult result = state.get(partition).calculate(user); log.info("{} - New result for {}: {}", id, user, result); results.addResults(partition, user, result); + throttle(); } - @Override - public void accept(ConsumerRecord record) + private void throttle() { - Integer partition = record.partition(); - String user = record.key(); - Message message = record.value(); - - switch(message.getType()) - { - case ADD: - addNumber(partition, user, (MessageAddNumber) message); - break; - - case CALC: - calculateSum(partition, user, (MessageCalculateSum) message); - break; - } - if (throttle.isPresent()) { try