1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.MessagingService;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.producer.KafkaProducer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
13 import java.util.concurrent.CompletableFuture;
16 @RequiredArgsConstructor
18 public class KafkaMessagingService implements MessagingService
20 private final KafkaProducer<String, String> producer;
21 private final ObjectMapper mapper;
22 private final String topic;
26 public CompletableFuture<?> send(Transfer transfer)
30 CompletableFuture<TopicPartition> future = new CompletableFuture<>();
31 ProducerRecord<String, String> record =
34 Long.toString(transfer.getId()),
35 mapper.writeValueAsString(transfer));
37 producer.send(record, (metadata, exception) ->
41 log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
42 future.complete(new TopicPartition(metadata.topic(), metadata.partition()));
46 log.error("Could not send {}: {}", transfer, exception.getMessage());
47 future.completeExceptionally(exception);
53 catch (JsonProcessingException e)
55 throw new RuntimeException("Could not convert " + transfer, e);