From: Kai Moritz Date: Fri, 14 Mar 2025 09:21:52 +0000 (+0100) Subject: Ergebniskontrolle über das zurückgegebenen Future X-Git-Tag: grundlagen/simple-producer--completablefuture--2026-03-22--22-01~6 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=b1a93930c4e9bfde928c442d79673d6b8643f9f4;p=demos%2Fkafka%2Ftraining Ergebniskontrolle über das zurückgegebenen Future --- diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index a7645889..c81d3bf7 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -1,12 +1,14 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.concurrent.CompletableFuture; @Slf4j @@ -71,7 +73,20 @@ public class ExampleProducer value // Value ); - producer.send(record, (metadata, e) -> + CompletableFuture completableFuture = CompletableFuture + .supplyAsync(() -> + { + try + { + return producer.send(record).get(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + + completableFuture.whenComplete((metadata, e) -> { long sendRequestProcessed = System.currentTimeMillis(); if (e == null)