import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
-import org.springframework.util.concurrent.ListenableFuture;
+
+import java.util.concurrent.CompletableFuture;
// tag::supersimple[]
{
// end::supersimple[]
// tag::callback[]
- ListenableFuture<SendResult<String, String>> listenableFuture =
+ CompletableFuture<SendResult<String, String>> completableFuture =
// tag::supersimple[]
kafkaTemplate.sendDefault(Long.toString(i%10), Long.toString(i));
// end::supersimple[]
- listenableFuture.addCallback(
- result -> log.info(
- "Sent {}={} to partition={}, offset={}",
- result.getProducerRecord().key(),
- result.getProducerRecord().value(),
- result.getRecordMetadata().partition(),
- result.getRecordMetadata().offset()),
- e -> log.error("ERROR sendig message", e));
+ completableFuture.thenAccept(result ->
+ log.info(
+ "Sent {}={} to partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset()));
+ completableFuture.exceptionally(e -> {
+ log.error("ERROR sendig message", e);
+ return null;
+ });
// end::callback[]
// tag::supersimple[]
}