From: Kai Moritz Date: Tue, 12 Nov 2024 03:02:17 +0000 (+0100) Subject: Migration auf `CompletableFuture` X-Git-Tag: spring/supersimple-producer--BRANCH-ENDE~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=fa2af2235a56319040e18c4b2e10a3be5102cea3;p=demos%2Fkafka%2Ftraining Migration auf `CompletableFuture` --- diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index a64dfc4..ba95dfb 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -8,7 +8,8 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; -import org.springframework.util.concurrent.ListenableFuture; + +import java.util.concurrent.CompletableFuture; // tag::supersimple[] @@ -26,19 +27,22 @@ public class ExampleProducer implements ApplicationRunner { // end::supersimple[] // tag::callback[] - ListenableFuture> listenableFuture = + CompletableFuture> completableFuture = // tag::supersimple[] kafkaTemplate.sendDefault(Long.toString(i%10), Long.toString(i)); // end::supersimple[] - listenableFuture.addCallback( - result -> log.info( - "Sent {}={} to partition={}, offset={}", - result.getProducerRecord().key(), - result.getProducerRecord().value(), - result.getRecordMetadata().partition(), - result.getRecordMetadata().offset()), - e -> log.error("ERROR sendig message", e)); + completableFuture.thenAccept(result -> + log.info( + "Sent {}={} to partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset())); + completableFuture.exceptionally(e -> { + log.error("ERROR sendig message", e); + return null; + }); // end::callback[] // tag::supersimple[] }