From: Kai Moritz Date: Fri, 14 Mar 2025 09:21:52 +0000 (+0100) Subject: Ergebniskontrolle über das zurückgegebenen Future X-Git-Tag: grundlagen/simple-producer--completablefuture--2025-03-18--19-42~1 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=7f5abe1772d77283f1ee34760fb3a81f5d551f5a;p=demos%2Fkafka%2Ftraining Ergebniskontrolle über das zurückgegebenen Future --- diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 0bab4426..6c585bf7 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -4,9 +4,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; @Slf4j @@ -71,7 +74,19 @@ public class ExampleProducer value // Value ); - producer.send(record, (metadata, e) -> + CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> + { + try + { + return producer.send(record).get(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + + completableFuture.whenComplete((metadata, e) -> { long now = System.currentTimeMillis(); if (e == null)