1 package de.juplo.kafka.payment.transfer.domain;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.producer.KafkaProducer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.springframework.http.ResponseEntity;
12 import java.util.UUID;
16 @RequiredArgsConstructor
17 public class TransferService
19 private final TransferRepository repository;
20 private final KafkaProducer<UUID, String> producer;
21 private final ObjectMapper mapper;
22 private final String topic;
24 public void initiate(Transfer transfer)
27 .get(transfer.getId())
39 ProducerRecord<UUID, String> record =
43 mapper.writeValueAsString(transfer));
45 producer.send(record, (metadata, exception) ->
47 if (exception != null)
49 log.error("Could not place order {}: {}", transfer, exception.toString());
50 result.setErrorResult(exception);
54 result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
59 log.error("Unexpected exception!", e);
60 result.setErrorResult(e);