From 3d3bdef540f16bb0e86f469bdd8255c567313604 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 28 Oct 2024 09:28:39 +0100 Subject: [PATCH] =?utf8?q?Das=20Throttling=20kann=20=C3=BCber=20`juplo.pro?= =?utf8?q?ducer.throttle-ms`=20gesteuert=20werden?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../de/juplo/kafka/ApplicationConfiguration.java | 3 +++ .../de/juplo/kafka/ApplicationProperties.java | 1 + .../java/de/juplo/kafka/ExampleProducer.java | 16 +++++++++++++++- src/main/resources/application.yml | 2 ++ 4 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 2491f09..2be61da 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -23,6 +23,9 @@ public class ApplicationConfiguration new ExampleProducer( properties.getClientId(), properties.getProducerProperties().getTopic(), + properties.getProducerProperties().getThrottleMs() == null + ? 500 + : properties.getProducerProperties().getThrottleMs(), kafkaProducer); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 1f83246..5cb9aa0 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -49,5 +49,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String compressionType; + private Integer throttleMs; } } diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 38bcb9f..6f7c093 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -10,6 +10,7 @@ public class ExampleProducer implements Runnable { private final String id; private final String topic; + private final int throttleMs; private final Producer producer; private final Thread workerThread; @@ -20,10 +21,12 @@ public class ExampleProducer implements Runnable public ExampleProducer( String id, String topic, + int throttleMs, Producer producer) { this.id = id; this.topic = topic; + this.throttleMs = throttleMs; this.producer = producer; workerThread = new Thread(this, "ExampleProducer Worker-Thread"); @@ -41,7 +44,18 @@ public class ExampleProducer implements Runnable for (; running; i++) { send(Long.toString(i%10), Long.toString(i)); - Thread.sleep(500); + + if (throttleMs > 0) + { + try + { + Thread.sleep(throttleMs); + } + catch (InterruptedException e) + { + log.warn("{} - Interrupted while throttling!", e); + } + } } } catch (Exception e) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9785648..85aee9d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -7,6 +7,7 @@ juplo: batch-size: 16384 linger-ms: 0 compression-type: gzip + throttle-ms: 500 management: endpoint: shutdown: @@ -30,6 +31,7 @@ info: batch-size: ${juplo.producer.batch-size} linger-ms: ${juplo.producer.linger-ms} compression-type: ${juplo.producer.compression-type} + throttle-ms: ${juplo.producer.throttle-ms} logging: level: root: INFO -- 2.20.1