From: Kai Moritz Date: Mon, 28 Oct 2024 08:28:39 +0000 (+0100) Subject: Das Throttling kann über `juplo.producer.throttle-ms` gesteuert werden X-Git-Tag: producer/spring-producer--fixedsharding--null~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3d3bdef540f16bb0e86f469bdd8255c567313604;p=demos%2Fkafka%2Ftraining Das Throttling kann über `juplo.producer.throttle-ms` gesteuert werden --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 2491f09..2be61da 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -23,6 +23,9 @@ public class ApplicationConfiguration new ExampleProducer( properties.getClientId(), properties.getProducerProperties().getTopic(), + properties.getProducerProperties().getThrottleMs() == null + ? 500 + : properties.getProducerProperties().getThrottleMs(), kafkaProducer); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 1f83246..5cb9aa0 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -49,5 +49,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String compressionType; + private Integer throttleMs; } } diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 38bcb9f..6f7c093 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -10,6 +10,7 @@ public class ExampleProducer implements Runnable { private final String id; private final String topic; + private final int throttleMs; private final Producer producer; private final Thread workerThread; @@ -20,10 +21,12 @@ public class ExampleProducer implements Runnable public ExampleProducer( String id, String topic, + int throttleMs, Producer producer) { this.id = id; this.topic = topic; + this.throttleMs = throttleMs; this.producer = producer; workerThread = new Thread(this, "ExampleProducer Worker-Thread"); @@ -41,7 +44,18 @@ public class ExampleProducer implements Runnable for (; running; i++) { send(Long.toString(i%10), Long.toString(i)); - Thread.sleep(500); + + if (throttleMs > 0) + { + try + { + Thread.sleep(throttleMs); + } + catch (InterruptedException e) + { + log.warn("{} - Interrupted while throttling!", e); + } + } } } catch (Exception e) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9785648..85aee9d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -7,6 +7,7 @@ juplo: batch-size: 16384 linger-ms: 0 compression-type: gzip + throttle-ms: 500 management: endpoint: shutdown: @@ -30,6 +31,7 @@ info: batch-size: ${juplo.producer.batch-size} linger-ms: ${juplo.producer.linger-ms} compression-type: ${juplo.producer.compression-type} + throttle-ms: ${juplo.producer.throttle-ms} logging: level: root: INFO