new ArrayBlockingQueue<>(MAX_PENDING_MESSAGES),
new BlockingRejectedExecutionHandler()
);
+ private final Semaphore semaphore = new Semaphore(MAX_PENDING_MESSAGES);
private volatile boolean running = true;
private volatile boolean done = false;
private long produced = 0;
send(Long.toString(i%10), Long.toString(i));
Thread.sleep(500);
}
+ semaphore.acquire(MAX_PENDING_MESSAGES);
}
catch (Exception e)
{
}
}
- void send(String key, String value)
+ void send(String key, String value) throws InterruptedException
{
final long sendRequested = System.currentTimeMillis();
value // Value
);
+ semaphore.acquire();
CompletableFuture<RecordMetadata> completableFuture = CompletableFuture
.supplyAsync(() ->
{
{
Future<RecordMetadata> result = producer.send(record);
long sendRequestQueued = System.currentTimeMillis();
+ semaphore.release();
log.trace(
"{} - Queued message {}={}, latency={}ms",
id,