semaphore.acquire();
CompletableFuture<RecordMetadata> completableFuture = CompletableFuture
.supplyAsync(() ->
+ {
+ Future<RecordMetadata> recordMetadataFuture = producer.send(record);
+ long sendRequestQueued = System.currentTimeMillis();
+ semaphore.release();
+ log.trace(
+ "{} - Queued message {}={}, latency={}ms",
+ id,
+ key,
+ value,
+ sendRequestQueued - sendRequested
+ );
+ return recordMetadataFuture;
+ }, executor)
+ .thenApplyAsync(recordMetadataFuture ->
{
try
{
- Future<RecordMetadata> result = producer.send(record);
- long sendRequestQueued = System.currentTimeMillis();
- semaphore.release();
- log.trace(
- "{} - Queued message {}={}, latency={}ms",
- id,
- key,
- value,
- sendRequestQueued - sendRequested
- );
- return result.get();
+ return recordMetadataFuture.get();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
- }, executor);
+ });
completableFuture.whenComplete((metadata, e) ->
{