import org.apache.kafka.clients.producer.ProducerRecord;
import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
private final String id;
private final String topic;
private final Duration throttle;
+ private final int maxQueueLength;
private final Producer<String, String> producer;
private final Thread workerThread;
private final Runnable closeCallback;
private volatile boolean running = true;
+ private final AtomicInteger queued = new AtomicInteger(0);
private long produced = 0;
String id,
String topic,
Duration throttle,
+ int maxQueueLength,
Producer<String, String> producer,
Runnable closeCallback)
{
this.id = id;
this.topic = topic;
this.throttle = throttle;
+ this.maxQueueLength = maxQueueLength;
this.producer = producer;
workerThread = new Thread(this, "ExampleProducer Worker-Thread");
void send(String key, String value)
{
+ while (queued.get() >= maxQueueLength)
+ {
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("{} - Interrupted while waiting for queue to be progressed, queued={}!", queued, e);
+ }
+ }
+
final long sendRequested = System.currentTimeMillis();
final ProducerRecord<String, String> record = new ProducerRecord<>(
value // Value
);
+ int queuedAfterSend = queued.incrementAndGet();
+
producer.send(record, (metadata, e) ->
{
long sendRequestProcessed = System.currentTimeMillis();
+ int queuedAfterReceive = queued.decrementAndGet();
if (e == null)
{
// HANDLE SUCCESS
log.debug(
- "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms",
+ "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms, queued={}",
id,
key,
value,
metadata.partition(),
metadata.offset(),
metadata.timestamp(),
- sendRequestProcessed - sendRequested
+ sendRequestProcessed - sendRequested,
+ queuedAfterReceive
);
}
else
{
// HANDLE ERROR
log.error(
- "{} - ERROR for message {}={}, latency={}ms: {}",
+ "{} - ERROR for message {}={}, latency={}ms, queued={}: {}",
id,
key,
value,
sendRequestProcessed - sendRequested,
+ queuedAfterReceive,
e.toString()
);
}
long sendRequestQueued = System.currentTimeMillis();
produced++;
log.trace(
- "{} - Queued message {}={}, latency={}ms",
+ "{} - Queued message {}={}, latency={}ms, queued={}",
id,
key,
value,
- sendRequestQueued - sendRequested
+ sendRequestQueued - sendRequested,
+ queuedAfterSend
);
}