import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import java.time.Duration;
import java.util.Properties;
new ExampleProducer(
properties.getClientId(),
properties.getProducerProperties().getTopic(),
+ properties.getProducerProperties().getThrottle() == null
+ ? Duration.ofMillis(500)
+ : properties.getProducerProperties().getThrottle(),
kafkaProducer);
}
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import java.time.Duration;
+
@Slf4j
public class ExampleProducer implements Runnable
{
private final String id;
private final String topic;
+ private final Duration throttle;
private final Producer<String, String> producer;
private final Thread workerThread;
public ExampleProducer(
String id,
String topic,
+ Duration throttle,
Producer<String, String> producer)
{
this.id = id;
this.topic = topic;
+ this.throttle = throttle;
this.producer = producer;
workerThread = new Thread(this, "ExampleProducer Worker-Thread");
for (; running; i++)
{
send(Long.toString(i%10), Long.toString(i));
- Thread.sleep(500);
+
+ if (throttle.isPositive())
+ {
+ try
+ {
+ Thread.sleep(throttle);
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("{} - Interrupted while throttling!", e);
+ }
+ }
}
}
catch (Exception e)
batch-size: 16384
linger: 0
compression-type: gzip
+ throttle: 500
management:
endpoint:
shutdown:
batch-size: ${juplo.producer.batch-size}
linger: ${juplo.producer.linger}
compression-type: ${juplo.producer.compression-type}
+ throttle: ${juplo.producer.throttle}
logging:
level:
root: INFO