From 5ed85e5da13e32e6f26620a8da7bd4555408ca82 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 28 Oct 2024 09:28:39 +0100 Subject: [PATCH] =?utf8?q?Das=20Throttling=20kann=20=C3=BCber=20`juplo.pro?= =?utf8?q?ducer.throttle-ms`=20gesteuert=20werden?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../juplo/kafka/ApplicationConfiguration.java | 4 ++++ .../de/juplo/kafka/ApplicationProperties.java | 1 + .../java/de/juplo/kafka/ExampleProducer.java | 18 +++++++++++++++++- src/main/resources/application.yml | 2 ++ 4 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index ff9170d..d251427 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -7,6 +7,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.time.Duration; import java.util.Properties; @@ -23,6 +24,9 @@ public class ApplicationConfiguration new ExampleProducer( properties.getClientId(), properties.getProducerProperties().getTopic(), + properties.getProducerProperties().getThrottle() == null + ? Duration.ofMillis(500) + : properties.getProducerProperties().getThrottle(), kafkaProducer); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 16c2831..ea7622e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -51,5 +51,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String compressionType; + private Duration throttle; } } diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 2f64f03..0842faa 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -4,12 +4,15 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import java.time.Duration; + @Slf4j public class ExampleProducer implements Runnable { private final String id; private final String topic; + private final Duration throttle; private final Producer producer; private final Thread workerThread; @@ -20,10 +23,12 @@ public class ExampleProducer implements Runnable public ExampleProducer( String id, String topic, + Duration throttle, Producer producer) { this.id = id; this.topic = topic; + this.throttle = throttle; this.producer = producer; workerThread = new Thread(this, "ExampleProducer Worker-Thread"); @@ -41,7 +46,18 @@ public class ExampleProducer implements Runnable for (; running; i++) { send(Long.toString(i%10), Long.toString(i)); - Thread.sleep(500); + + if (throttle.isPositive()) + { + try + { + Thread.sleep(throttle); + } + catch (InterruptedException e) + { + log.warn("{} - Interrupted while throttling!", e); + } + } } } catch (Exception e) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0e28aba..cb9930a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -7,6 +7,7 @@ juplo: batch-size: 16384 linger: 0 compression-type: gzip + throttle: 500 management: endpoint: shutdown: @@ -30,6 +31,7 @@ info: batch-size: ${juplo.producer.batch-size} linger: ${juplo.producer.linger} compression-type: ${juplo.producer.compression-type} + throttle: ${juplo.producer.throttle} logging: level: root: INFO -- 2.20.1