]> juplo.de Git - demos/kafka/training/commitdiff
Das Throttling kann über `juplo.producer.throttle-ms` gesteuert werden
authorKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 08:28:39 +0000 (09:28 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 29 Oct 2024 17:06:38 +0000 (18:06 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ExampleProducer.java
src/main/resources/application.yml

index 2491f092465ff3e81e520f709c063abcc8509f13..2be61daf55edbb23936ddbb06adc73d03c5d9ed8 100644 (file)
@@ -23,6 +23,9 @@ public class ApplicationConfiguration
         new ExampleProducer(
             properties.getClientId(),
             properties.getProducerProperties().getTopic(),
+            properties.getProducerProperties().getThrottleMs() == null
+              ? 500
+              : properties.getProducerProperties().getThrottleMs(),
             kafkaProducer);
   }
 
index 1f83246c49b3c3382fb7d26dbca8be4f48f3cfc9..5cb9aa0517c37979275bca2a2b6f9499bdb879ea 100644 (file)
@@ -49,5 +49,6 @@ public class ApplicationProperties
     @NotNull
     @NotEmpty
     private String compressionType;
+    private Integer throttleMs;
   }
 }
index 38bcb9fa7e58c086b461fb3bee32fed617df1fb8..6f7c093c13ef3b9357e9f78426ec5b7225877d61 100644 (file)
@@ -10,6 +10,7 @@ public class ExampleProducer implements Runnable
 {
   private final String id;
   private final String topic;
+  private final int throttleMs;
   private final Producer<String, String> producer;
   private final Thread workerThread;
 
@@ -20,10 +21,12 @@ public class ExampleProducer implements Runnable
   public ExampleProducer(
     String id,
     String topic,
+    int throttleMs,
     Producer<String, String> producer)
   {
     this.id = id;
     this.topic = topic;
+    this.throttleMs = throttleMs;
     this.producer = producer;
 
     workerThread = new Thread(this, "ExampleProducer Worker-Thread");
@@ -41,7 +44,18 @@ public class ExampleProducer implements Runnable
       for (; running; i++)
       {
         send(Long.toString(i%10), Long.toString(i));
-        Thread.sleep(500);
+
+        if (throttleMs > 0)
+        {
+          try
+          {
+            Thread.sleep(throttleMs);
+          }
+          catch (InterruptedException e)
+          {
+            log.warn("{} - Interrupted while throttling!", e);
+          }
+        }
       }
     }
     catch (Exception e)
index 97856483f50ad94b72c519d00808f5c68e75b414..85aee9d5c661e762de8dcffda20e6fff9af29713 100644 (file)
@@ -7,6 +7,7 @@ juplo:
     batch-size: 16384
     linger-ms: 0
     compression-type: gzip
+    throttle-ms: 500
 management:
   endpoint:
     shutdown:
@@ -30,6 +31,7 @@ info:
       batch-size: ${juplo.producer.batch-size}
       linger-ms: ${juplo.producer.linger-ms}
       compression-type: ${juplo.producer.compression-type}
+      throttle-ms: ${juplo.producer.throttle-ms}
 logging:
   level:
     root: INFO