1 package de.juplo.kafka.payment.transfer;
7 import de.juplo.kafka.payment.avro.Order;
8 import de.juplo.kafka.payment.avro.OrderState;
9 import org.apache.kafka.clients.producer.KafkaProducer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.springframework.http.MediaType;
14 import org.springframework.http.ResponseEntity;
15 import org.springframework.web.bind.annotation.PostMapping;
16 import org.springframework.web.bind.annotation.RequestBody;
17 import org.springframework.web.bind.annotation.RestController;
18 import org.springframework.web.context.request.async.DeferredResult;
20 import javax.validation.Valid;
24 public class TransferService
26 private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
28 private final KafkaProducer<UUID, TransferBean> producer;
29 private final String topic;
30 private final String path;
34 final KafkaProducer<UUID, TransferBean> producer,
35 final TransferServiceProperties properties)
37 this.producer = producer;
38 this.topic = properties.getTopic();
39 this.path = properties.getPath();
45 consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,
46 produces = MediaType.TEXT_PLAIN_VALUE)
47 public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody TransferBean transfer)
49 DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
53 UUID uuid = UUID.randomUUID();
54 ProducerRecord<UUID, TransferBean> record =
60 .id(transfer.getId().toString())
61 .setState(TransferState.CREATED)
62 .setCustomerId(transfer.getCustomerId())
63 .setOrderId(transfer.getId())
64 .setProductId(transfer.getProductId())
65 .setQuantity(transfer.getQuantity())
68 producer.send(record, (metadata, exception) ->
70 if (exception != null)
72 LOG.error("Could not place order {}: {}", transfer, exception.toString());
73 result.setErrorResult(exception);
77 result.setResult(ResponseEntity.created(URI.create(path + uuid)).build());
82 LOG.error("Unexpected exception!", e);
83 result.setErrorResult(e);