1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.MessagingService;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.producer.KafkaProducer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
13 import java.util.concurrent.CompletableFuture;
16 @RequiredArgsConstructor
18 public class KafkaMessagingService implements MessagingService
20 private final KafkaProducer<String, String> producer;
21 private final ObjectMapper mapper;
22 private final String topic;
26 public CompletableFuture<?> send(Transfer transfer)
28 return send(transfer.getId(), EventType.NEW_TRANSFER, NewTransferEvent.ofTransfer(transfer));
31 public CompletableFuture<?> send(Long id, Transfer.State state)
33 return send(id, EventType.TRANSFER_STATE_CHANGED, new TransferStateChangedEvent(id, state));
36 private CompletableFuture send(Long id, byte eventType, Object payload)
40 CompletableFuture<TopicPartition> future = new CompletableFuture<>();
42 ProducerRecord<String, String> record =
46 mapper.writeValueAsString(payload));
47 record.headers().add(EventType.HEADER, new byte[] { eventType });
49 producer.send(record, (metadata, exception) ->
53 log.debug("Sent {} to {}/{}:{}", payload, metadata.topic(), metadata.partition(), metadata.offset());
54 future.complete(new TopicPartition(metadata.topic(), metadata.partition()));
58 log.error("Could not send {}: {}", payload, exception.getMessage());
59 future.completeExceptionally(exception);
65 catch (JsonProcessingException e)
67 throw new RuntimeException("Could not convert " + payload, e);