From ad4ed61abeb48124f4db65687ede71f5bc943f27 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 22 Aug 2022 18:24:11 +0200 Subject: [PATCH] =?utf8?q?Konfig-Parameter=20zum=20k=C3=BCnstlichen=20Verz?= =?utf8?q?=C3=B6gern=20der=20Verabeitung=20eingebaut?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- docker-compose.yml | 4 ++++ .../juplo/kafka/ApplicationConfiguration.java | 9 +++++++-- .../de/juplo/kafka/ApplicationProperties.java | 1 + .../juplo/kafka/ApplicationRecordHandler.java | 20 +++++++++++++++++-- 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 96fda60..c46b00d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -132,6 +132,7 @@ services: sumup.adder.bootstrap-server: kafka:9092 sumup.adder.client-id: adder-1 sumup.adder.commit-interval: 3s + sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo logging.level.org.apache.kafka.clients.consumer: DEBUG @@ -145,6 +146,7 @@ services: sumup.adder.bootstrap-server: kafka:9092 sumup.adder.client-id: adder-2 sumup.adder.commit-interval: 3s + sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo logging.level.org.apache.kafka.clients.consumer: DEBUG @@ -156,6 +158,7 @@ services: while [[ true ]]; do echo 666 | http -v gateway:8080/peter; + sleep 1; done " klaus: @@ -165,5 +168,6 @@ services: while [[ true ]]; do echo 666 | http -v gateway:8080/klaus; + sleep 1; done " diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index c1bc019..4d056c4 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -7,6 +7,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.time.Clock; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -17,9 +18,13 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public ApplicationRecordHandler recordHandler(AdderResults adderResults) + public ApplicationRecordHandler recordHandler( + AdderResults adderResults, + ApplicationProperties properties) { - return new ApplicationRecordHandler(adderResults); + return new ApplicationRecordHandler( + adderResults, + Optional.ofNullable(properties.getThrottle())); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 410c623..f852c00 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -33,4 +33,5 @@ public class ApplicationProperties private String autoOffsetReset; @NotNull private Duration commitInterval; + private Duration throttle; } diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 596f3da..0f5b982 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -4,8 +4,10 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; @RequiredArgsConstructor @@ -13,6 +15,7 @@ import java.util.Map; public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; + private final Optional throttle; private final Map state = new HashMap<>(); @@ -29,10 +32,23 @@ public class ApplicationRecordHandler implements RecordHandler AdderResult result = state.get(partition).calculate(user); log.info("New result for {}: {}", user, result); results.addResults(partition, user, result); - return; + } + else + { + state.get(partition).addToSum(user, Integer.parseInt(message)); } - state.get(partition).addToSum(user, Integer.parseInt(message)); + if (throttle.isPresent()) + { + try + { + Thread.sleep(throttle.get().toMillis()); + } + catch (InterruptedException e) + { + log.warn("Intrerrupted while throttling: {}", e); + } + } } protected void addPartition(Integer partition, Map state) -- 2.20.1