X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=28291579fd85c7efeb885344bce3e9c76fc13db1;hb=ce92719ebd18b3c34a3aa8ca60cf67f6c3fbd8b2;hp=d0d385ca9f3ed0f94da4a5af058f1da0adc8a19e;hpb=a2e8fc924e5b472d6b90c42d311514f91ea452f1;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index d0d385c..2829157 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -1,47 +1,81 @@ package de.juplo.kafka; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +@RequiredArgsConstructor @Slf4j -public class ApplicationRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { + private final AdderResults results; + private final Optional throttle; + private final String id; + private final Map state = new HashMap<>(); + public void addNumber( + Integer partition, + String user, + MessageAddNumber message) + { + state.get(partition).addToSum(user, message.getNext()); + } + + public void calculateSum( + Integer partition, + String user, + MessageCalculateSum message) + { + AdderResult result = state.get(partition).calculate(user); + log.info("{} - New result for {}: {}", id, user, result); + results.addResults(partition, user, result); + } + @Override - public void accept(ConsumerRecord record) + public void accept(ConsumerRecord record) { Integer partition = record.partition(); String user = record.key(); - String message = record.value(); - switch (message) + Message message = record.value(); + + switch(message.getType()) { - case "START": - state.get(partition).startSum(user); + case ADD: + addNumber(partition, user, (MessageAddNumber) message); break; - case "END": - Long result = state.get(partition).endSum(user); - log.info("New result for {}: {}", user, result); + case CALC: + calculateSum(partition, user, (MessageCalculateSum) message); break; + } - default: - state.get(partition).addToSum(user, Integer.parseInt(message)); - break; + if (throttle.isPresent()) + { + try + { + Thread.sleep(throttle.get().toMillis()); + } + catch (InterruptedException e) + { + log.warn("{} - Intrerrupted while throttling: {}", id, e); + } } } - protected void addPartition(Integer partition, Map state) + protected void addPartition(Integer partition, Map state) { this.state.put(partition, new AdderBusinessLogic(state)); } - protected Map removePartition(Integer partition) + protected Map removePartition(Integer partition) { return this.state.remove(partition).getState(); } @@ -51,4 +85,9 @@ public class ApplicationRecordHandler implements RecordHandler { return state; } + + public AdderBusinessLogic getState(Integer partition) + { + return state.get(partition); + } }