Das Throttling kann über `juplo.producer.throttle-ms` gesteuert werden
authorKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 08:28:39 +0000 (09:28 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 29 Oct 2024 17:06:38 +0000 (18:06 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ExampleProducer.java
src/main/resources/application.yml

index 2491f09..2be61da 100644 (file)
@@ -23,6 +23,9 @@ public class ApplicationConfiguration
         new ExampleProducer(
             properties.getClientId(),
             properties.getProducerProperties().getTopic(),
+            properties.getProducerProperties().getThrottleMs() == null
+              ? 500
+              : properties.getProducerProperties().getThrottleMs(),
             kafkaProducer);
   }
 
index 1f83246..5cb9aa0 100644 (file)
@@ -49,5 +49,6 @@ public class ApplicationProperties
     @NotNull
     @NotEmpty
     private String compressionType;
+    private Integer throttleMs;
   }
 }
index 38bcb9f..6f7c093 100644 (file)
@@ -10,6 +10,7 @@ public class ExampleProducer implements Runnable
 {
   private final String id;
   private final String topic;
+  private final int throttleMs;
   private final Producer<String, String> producer;
   private final Thread workerThread;
 
@@ -20,10 +21,12 @@ public class ExampleProducer implements Runnable
   public ExampleProducer(
     String id,
     String topic,
+    int throttleMs,
     Producer<String, String> producer)
   {
     this.id = id;
     this.topic = topic;
+    this.throttleMs = throttleMs;
     this.producer = producer;
 
     workerThread = new Thread(this, "ExampleProducer Worker-Thread");
@@ -41,7 +44,18 @@ public class ExampleProducer implements Runnable
       for (; running; i++)
       {
         send(Long.toString(i%10), Long.toString(i));
-        Thread.sleep(500);
+
+        if (throttleMs > 0)
+        {
+          try
+          {
+            Thread.sleep(throttleMs);
+          }
+          catch (InterruptedException e)
+          {
+            log.warn("{} - Interrupted while throttling!", e);
+          }
+        }
       }
     }
     catch (Exception e)
index 9785648..85aee9d 100644 (file)
@@ -7,6 +7,7 @@ juplo:
     batch-size: 16384
     linger-ms: 0
     compression-type: gzip
+    throttle-ms: 500
 management:
   endpoint:
     shutdown:
@@ -30,6 +31,7 @@ info:
       batch-size: ${juplo.producer.batch-size}
       linger-ms: ${juplo.producer.linger-ms}
       compression-type: ${juplo.producer.compression-type}
+      throttle-ms: ${juplo.producer.throttle-ms}
 logging:
   level:
     root: INFO