WIP
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / KafkaMessagingService.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.MessagingService;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.producer.KafkaProducer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12
13 import java.util.concurrent.CompletableFuture;
14
15
16 @RequiredArgsConstructor
17 @Slf4j
18 public class KafkaMessagingService implements MessagingService
19 {
20   private final KafkaProducer<String, String> producer;
21   private final ObjectMapper mapper;
22   private final String topic;
23
24
25   @Override
26   public CompletableFuture<?> send(Transfer transfer)
27   {
28     return send(transfer.getId(), EventType.NEW_TRANSFER, NewTransferEvent.ofTransfer(transfer));
29   }
30
31   public CompletableFuture<?> send(Long id, Transfer.State state)
32   {
33     return send(id, EventType.TRANSFER_STATE_CHANGED, new TransferStateChangedEvent(id, state));
34   }
35
36   private CompletableFuture send(Long id, byte eventType, Object payload)
37   {
38     try
39     {
40       CompletableFuture<TopicPartition> future = new CompletableFuture<>();
41
42       ProducerRecord<String, String> record =
43           new ProducerRecord<>(
44               topic,
45               Long.toString(id),
46               mapper.writeValueAsString(payload));
47       record.headers().add(EventType.HEADER, new byte[] { eventType });
48
49       producer.send(record, (metadata, exception) ->
50       {
51         if (metadata != null)
52         {
53           log.debug("Sent {} to {}/{}:{}", payload, metadata.topic(), metadata.partition(), metadata.offset());
54           future.complete(new TopicPartition(metadata.topic(), metadata.partition()));
55         }
56         else
57         {
58           log.error("Could not send {}: {}", payload, exception.getMessage());
59           future.completeExceptionally(exception);
60         }
61       });
62
63       return future;
64     }
65     catch (JsonProcessingException e)
66     {
67       throw new RuntimeException("Could not convert " + payload, e);
68     }
69   }
70
71 }