]> juplo.de Git - demos/kafka/training/commitdiff
Ergebniskontrolle über das zurückgegebenen Future<RecordMetadata>
authorKai Moritz <kai@juplo.de>
Fri, 14 Mar 2025 09:21:52 +0000 (10:21 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 22 Mar 2026 20:02:05 +0000 (21:02 +0100)
src/main/java/de/juplo/kafka/ExampleProducer.java

index a76458890f62b9c25ccc571a77a4f4821fb6fa9f..c81d3bf74ebc8894b98021e289444741d5aa25ed 100644 (file)
@@ -1,12 +1,14 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 
 
 @Slf4j
@@ -71,7 +73,20 @@ public class ExampleProducer
       value   // Value
     );
 
-    producer.send(record, (metadata, e) ->
+    CompletableFuture<RecordMetadata> completableFuture = CompletableFuture
+      .supplyAsync(() ->
+      {
+        try
+        {
+          return producer.send(record).get();
+        }
+        catch (Exception e)
+        {
+          throw new RuntimeException(e);
+        }
+      });
+
+    completableFuture.whenComplete((metadata, e) ->
     {
       long sendRequestProcessed = System.currentTimeMillis();
       if (e == null)