From 5d9ac6b3b58d1e635d4f83a2a3162626b6c57da8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 16 Sep 2022 12:10:38 +0200 Subject: [PATCH] Throttling aus dem `ApplicationRecordHandler` entfernt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Den Mechanismus zum künstlichen Ausbremsen der Verarbeitung und die zugehörige Konfigurations-Option aus dem `ApplicationRecordHandler` entfernt, um zu einer möglichst einfachen Anfangs-Lösung zu gelangen. --- src/main/java/de/juplo/kafka/Application.java | 5 +---- .../de/juplo/kafka/ApplicationProperties.java | 2 -- .../juplo/kafka/ApplicationRecordHandler.java | 19 ------------------- 3 files changed, 1 insertion(+), 25 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 69a9712..d61009e 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -21,7 +21,6 @@ import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.backoff.FixedBackOff; import java.util.Map; -import java.util.Optional; @SpringBootApplication @@ -33,12 +32,10 @@ public class Application @Bean public ApplicationRecordHandler applicationRecordHandler( AdderResults adderResults, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) + KafkaProperties kafkaProperties) { return new ApplicationRecordHandler( adderResults, - Optional.ofNullable(applicationProperties.getThrottle()), kafkaProperties.getConsumer().getGroupId()); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 005460c..940ed5b 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -7,7 +7,6 @@ import org.springframework.validation.annotation.Validated; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; -import java.time.Duration; @ConfigurationProperties(prefix = "sumup.adder") @@ -19,5 +18,4 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; - private Duration throttle; } diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 2075781..f082314 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -8,10 +8,8 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; -import java.time.Duration; import java.util.HashMap; import java.util.Map; -import java.util.Optional; @RequiredArgsConstructor @@ -22,7 +20,6 @@ import java.util.Optional; public class ApplicationRecordHandler { private final AdderResults results; - private final Optional throttle; private final String id; private final Map state = new HashMap<>(); @@ -39,7 +36,6 @@ public class ApplicationRecordHandler { log.debug("{} - Received {} for {} on {}", id, message, user, partition); state.get(partition).addToSum(user, message.getNext()); - throttle(); } @KafkaHandler @@ -54,23 +50,8 @@ public class ApplicationRecordHandler AdderResult result = state.get(partition).calculate(user); log.info("{} - New result for {}: {}", id, user, result); results.addResults(partition, user, result); - throttle(); } - private void throttle() - { - if (throttle.isPresent()) - { - try - { - Thread.sleep(throttle.get().toMillis()); - } - catch (InterruptedException e) - { - log.warn("{} - Intrerrupted while throttling: {}", id, e); - } - } - } protected void addPartition(Integer partition, Map state) { -- 2.20.1