import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
@Slf4j
value // Value
);
- producer.send(record, (metadata, e) ->
+ CompletableFuture<RecordMetadata> completableFuture = CompletableFuture.supplyAsync(() ->
+ {
+ try
+ {
+ return producer.send(record).get();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+
+ completableFuture.whenComplete((metadata, e) ->
{
long now = System.currentTimeMillis();
if (e == null)