Das Throttling kann über `juplo.producer.throttle-ms` gesteuert werden
authorKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 08:28:39 +0000 (09:28 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 8 Nov 2024 17:06:03 +0000 (18:06 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ExampleProducer.java
src/main/resources/application.yml

index ff9170d..d251427 100644 (file)
@@ -7,6 +7,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.time.Duration;
 import java.util.Properties;
 
 
@@ -23,6 +24,9 @@ public class ApplicationConfiguration
         new ExampleProducer(
             properties.getClientId(),
             properties.getProducerProperties().getTopic(),
+            properties.getProducerProperties().getThrottle() == null
+              ? Duration.ofMillis(500)
+              : properties.getProducerProperties().getThrottle(),
             kafkaProducer);
   }
 
index 16c2831..ea7622e 100644 (file)
@@ -51,5 +51,6 @@ public class ApplicationProperties
     @NotNull
     @NotEmpty
     private String compressionType;
+    private Duration throttle;
   }
 }
index 2f64f03..0842faa 100644 (file)
@@ -4,12 +4,15 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import java.time.Duration;
+
 
 @Slf4j
 public class ExampleProducer implements Runnable
 {
   private final String id;
   private final String topic;
+  private final Duration throttle;
   private final Producer<String, String> producer;
   private final Thread workerThread;
 
@@ -20,10 +23,12 @@ public class ExampleProducer implements Runnable
   public ExampleProducer(
     String id,
     String topic,
+    Duration throttle,
     Producer<String, String> producer)
   {
     this.id = id;
     this.topic = topic;
+    this.throttle = throttle;
     this.producer = producer;
 
     workerThread = new Thread(this, "ExampleProducer Worker-Thread");
@@ -41,7 +46,18 @@ public class ExampleProducer implements Runnable
       for (; running; i++)
       {
         send(Long.toString(i%10), Long.toString(i));
-        Thread.sleep(500);
+
+        if (throttle.isPositive())
+        {
+          try
+          {
+            Thread.sleep(throttle);
+          }
+          catch (InterruptedException e)
+          {
+            log.warn("{} - Interrupted while throttling!", e);
+          }
+        }
       }
     }
     catch (Exception e)
index 0e28aba..cb9930a 100644 (file)
@@ -7,6 +7,7 @@ juplo:
     batch-size: 16384
     linger: 0
     compression-type: gzip
+    throttle: 500
 management:
   endpoint:
     shutdown:
@@ -30,6 +31,7 @@ info:
       batch-size: ${juplo.producer.batch-size}
       linger: ${juplo.producer.linger}
       compression-type: ${juplo.producer.compression-type}
+      throttle: ${juplo.producer.throttle}
 logging:
   level:
     root: INFO