X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=51d524fc682beb6a8998075a55bf037c7c25ee51;hb=21d88d8ede95d1f811ff91d3804cba6d95ae6aab;hp=93e12976b9e665d5c359f39fd55ba289588ca8ce;hpb=7c0368363c3e5dbb7eb2a08f343187a93f050617;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 93e1297..51d524f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -1,15 +1,23 @@ package de.juplo.kafka; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +@RequiredArgsConstructor @Slf4j public class ApplicationRecordHandler implements RecordHandler { + private final AdderResults results; + private final Optional throttle; + private final String id; + private final Map state = new HashMap<>(); @@ -23,11 +31,25 @@ public class ApplicationRecordHandler implements RecordHandler if (message.equals("CALCULATE")) { AdderResult result = state.get(partition).calculate(user); - log.info("New result for {}: {}", user, result); - return; + log.info("{} - New result for {}: {}", id, user, result); + results.addResults(partition, user, result); + } + else + { + state.get(partition).addToSum(user, Integer.parseInt(message)); } - state.get(partition).addToSum(user, Integer.parseInt(message)); + if (throttle.isPresent()) + { + try + { + Thread.sleep(throttle.get().toMillis()); + } + catch (InterruptedException e) + { + log.warn("{} - Intrerrupted while throttling: {}", id, e); + } + } } protected void addPartition(Integer partition, Map state) @@ -45,4 +67,9 @@ public class ApplicationRecordHandler implements RecordHandler { return state; } + + public AdderBusinessLogic getState(Integer partition) + { + return state.get(partition); + } }