TransferController sends a message, instead of calling TransferService
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / KafkaMessagingService.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.MessagingService;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.producer.KafkaProducer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12
13 import java.util.concurrent.CompletableFuture;
14
15
16 @RequiredArgsConstructor
17 @Slf4j
18 public class KafkaMessagingService implements MessagingService
19 {
20   private final KafkaProducer<String, String> producer;
21   private final ObjectMapper mapper;
22   private final String topic;
23
24
25   @Override
26   public CompletableFuture<?> send(Transfer transfer)
27   {
28     try
29     {
30       CompletableFuture<TopicPartition> future = new CompletableFuture<>();
31       ProducerRecord<String, String> record =
32           new ProducerRecord<>(
33               topic,
34               Long.toString(transfer.getId()),
35               mapper.writeValueAsString(transfer));
36
37       producer.send(record, (metadata, exception) ->
38       {
39         if (metadata != null)
40         {
41           log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
42           future.complete(new TopicPartition(metadata.topic(), metadata.partition()));
43         }
44         else
45         {
46           log.error("Could not send {}: {}", transfer, exception.getMessage());
47           future.completeExceptionally(exception);
48         }
49       });
50
51       return future;
52     }
53     catch (JsonProcessingException e)
54     {
55       throw new RuntimeException("Could not convert " + transfer, e);
56     }
57   }
58
59 }