{
final long time = System.currentTimeMillis();
- CompletableFuture<SendResult<String, String>> completableFuture = kafkaTemplate.send(topic, key, value);
-
- completableFuture.thenAccept(result ->
+ kafkaTemplate.send(topic, key, value).whenComplete((result, e) ->
+ {
+ long now = System.currentTimeMillis();
+ if (e == null)
{
// HANDLE SUCCESS
- long now = System.currentTimeMillis();
RecordMetadata metadata = result.getRecordMetadata();
produced++;
log.debug(
metadata.timestamp(),
now - time
);
- });
-
- completableFuture.exceptionally(e ->
+ }
+ else
{
// HANDLE ERROR
- long now = System.currentTimeMillis();
log.error(
"{} - ERROR for message {}={}, latency={}ms: {}",
id,
now - time,
e.toString()
);
- return null;
- });
+ }
+ });
long now = System.currentTimeMillis();
log.trace(