X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=0f5b982ccdda3e7d2cc84154ee8bb62754d2a03d;hb=ad4ed61abeb48124f4db65687ede71f5bc943f27;hp=596f3da256d9d1e9a052ea43fe2d21c674db94cd;hpb=4555c5783f1d27e896c54591bfb6d433beb57556;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 596f3da..0f5b982 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -4,8 +4,10 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; @RequiredArgsConstructor @@ -13,6 +15,7 @@ import java.util.Map; public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; + private final Optional throttle; private final Map state = new HashMap<>(); @@ -29,10 +32,23 @@ public class ApplicationRecordHandler implements RecordHandler AdderResult result = state.get(partition).calculate(user); log.info("New result for {}: {}", user, result); results.addResults(partition, user, result); - return; + } + else + { + state.get(partition).addToSum(user, Integer.parseInt(message)); } - state.get(partition).addToSum(user, Integer.parseInt(message)); + if (throttle.isPresent()) + { + try + { + Thread.sleep(throttle.get().toMillis()); + } + catch (InterruptedException e) + { + log.warn("Intrerrupted while throttling: {}", e); + } + } } protected void addPartition(Integer partition, Map state)