1 package de.juplo.kafka.payment.transfer.impl;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.api.Transfer;
6 import de.juplo.kafka.payment.transfer.api.TransferService;
7 import de.juplo.kafka.payment.transfer.persistence.TransferRepository;
8 import lombok.RequiredArgsConstructor;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.ProducerRecord;
12 import org.springframework.http.ResponseEntity;
13 import org.springframework.web.bind.annotation.RequestBody;
14 import org.springframework.web.context.request.async.DeferredResult;
16 import javax.validation.Valid;
18 import java.util.UUID;
22 @RequiredArgsConstructor
23 public class TransferServiceImpl implements TransferService
25 private final TransferRepository repository;
26 private final KafkaProducer<UUID, String> producer;
27 private final ObjectMapper mapper;
28 private final String topic;
32 public void initiate(Transfer transfer)
35 .get(transfer.getId())
47 ProducerRecord<UUID, String> record =
51 mapper.writeValueAsString(transfer));
53 producer.send(record, (metadata, exception) ->
55 if (exception != null)
57 log.error("Could not place order {}: {}", transfer, exception.toString());
58 result.setErrorResult(exception);
62 result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
67 log.error("Unexpected exception!", e);
68 result.setErrorResult(e);