1 package de.juplo.kafka.payment.transfer;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.http.MediaType;
10 import org.springframework.http.ResponseEntity;
11 import org.springframework.web.bind.annotation.PostMapping;
12 import org.springframework.web.bind.annotation.RequestBody;
13 import org.springframework.web.bind.annotation.RestController;
14 import org.springframework.web.context.request.async.DeferredResult;
16 import javax.validation.Valid;
18 import java.util.UUID;
22 public class TransferService
24 private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
26 private final KafkaProducer<UUID, String> producer;
27 private final ObjectMapper mapper;
28 private final String topic;
29 private final String path;
33 final KafkaProducer<UUID, String> producer,
34 final ObjectMapper mapper,
35 final TransferServiceProperties properties)
37 this.producer = producer;
39 this.topic = properties.getTopic();
40 this.path = properties.getPath();
46 consumes = MediaType.APPLICATION_JSON_VALUE,
47 produces = MediaType.TEXT_PLAIN_VALUE)
48 public DeferredResult<ResponseEntity<?>> transfer(@Valid @RequestBody Transfer transfer)
50 DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
54 ProducerRecord<UUID, String> record =
58 mapper.writeValueAsString(transfer));
60 producer.send(record, (metadata, exception) ->
62 if (exception != null)
64 LOG.error("Could not place order {}: {}", transfer, exception.toString());
65 result.setErrorResult(exception);
69 result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
74 LOG.error("Unexpected exception!", e);
75 result.setErrorResult(e);