Ergebniskontrolle über das zurückgegebenen Future<RecordMetadata>
authorKai Moritz <kai@juplo.de>
Fri, 14 Mar 2025 09:21:52 +0000 (10:21 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 14 Mar 2025 15:53:03 +0000 (16:53 +0100)
src/main/java/de/juplo/kafka/ExampleProducer.java

index 0bab442..6c585bf 100644 (file)
@@ -4,9 +4,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 
 
 @Slf4j
@@ -71,7 +74,19 @@ public class ExampleProducer
       value   // Value
     );
 
-    producer.send(record, (metadata, e) ->
+    CompletableFuture<RecordMetadata> completableFuture = CompletableFuture.supplyAsync(() ->
+    {
+      try
+      {
+        return producer.send(record).get();
+      }
+      catch (Exception e)
+      {
+        throw new RuntimeException(e);
+      }
+    });
+
+    completableFuture.whenComplete((metadata, e) ->
     {
       long now = System.currentTimeMillis();
       if (e == null)