new ExampleProducer(
properties.getClientId(),
properties.getProducerProperties().getTopic(),
+ properties.getProducerProperties().getThrottleMs() == null
+ ? 500
+ : properties.getProducerProperties().getThrottleMs(),
kafkaProducer);
}
{
private final String id;
private final String topic;
+ private final int throttleMs;
private final Producer<String, String> producer;
private final Thread workerThread;
public ExampleProducer(
String id,
String topic,
+ int throttleMs,
Producer<String, String> producer)
{
this.id = id;
this.topic = topic;
+ this.throttleMs = throttleMs;
this.producer = producer;
workerThread = new Thread(this, "ExampleProducer Worker-Thread");
for (; running; i++)
{
send(Long.toString(i%10), Long.toString(i));
- Thread.sleep(500);
+
+ if (throttleMs > 0)
+ {
+ try
+ {
+ Thread.sleep(throttleMs);
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("{} - Interrupted while throttling!", e);
+ }
+ }
}
}
catch (Exception e)
batch-size: 16384
linger-ms: 0
compression-type: gzip
+ throttle-ms: 500
management:
endpoint:
shutdown:
batch-size: ${juplo.producer.batch-size}
linger-ms: ${juplo.producer.linger-ms}
compression-type: ${juplo.producer.compression-type}
+ throttle-ms: ${juplo.producer.throttle-ms}
logging:
level:
root: INFO