Migration auf `CompletableFuture`
authorKai Moritz <kai@juplo.de>
Tue, 12 Nov 2024 03:02:17 +0000 (04:02 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 12 Nov 2024 11:19:44 +0000 (12:19 +0100)
src/main/java/de/juplo/kafka/ExampleProducer.java

index a64dfc4..ba95dfb 100644 (file)
@@ -8,7 +8,8 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
-import org.springframework.util.concurrent.ListenableFuture;
+
+import java.util.concurrent.CompletableFuture;
 
 
 // tag::supersimple[]
@@ -26,19 +27,22 @@ public class ExampleProducer implements ApplicationRunner
     {
       // end::supersimple[]
       // tag::callback[]
-      ListenableFuture<SendResult<String, String>> listenableFuture =
+      CompletableFuture<SendResult<String, String>> completableFuture =
       // tag::supersimple[]
           kafkaTemplate.sendDefault(Long.toString(i%10), Long.toString(i));
       // end::supersimple[]
 
-      listenableFuture.addCallback(
-          result -> log.info(
-              "Sent {}={} to partition={}, offset={}",
-              result.getProducerRecord().key(),
-              result.getProducerRecord().value(),
-              result.getRecordMetadata().partition(),
-              result.getRecordMetadata().offset()),
-          e -> log.error("ERROR sendig message", e));
+      completableFuture.thenAccept(result ->
+        log.info(
+          "Sent {}={} to partition={}, offset={}",
+          result.getProducerRecord().key(),
+          result.getProducerRecord().value(),
+          result.getRecordMetadata().partition(),
+          result.getRecordMetadata().offset()));
+      completableFuture.exceptionally(e -> {
+        log.error("ERROR sendig message", e);
+        return null;
+      });
       // end::callback[]
       // tag::supersimple[]
     }