+package de.juplo.kafka.payment.transfer;
+
+
+import de.trion.microservices.avro.Order;
+import de.trion.microservices.avro.OrderState;
+import java.net.URI;
+import java.util.UUID;
+import javax.validation.Valid;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.context.request.async.DeferredResult;
+
+
+@RestController
+public class TransferService
+{
+ private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
+
+ private final KafkaProducer<UUID,Order> producer;
+ private final String topic;
+ private final String path;
+
+
+ TransferService(
+ final KafkaProducer<UUID,Order> producer,
+ final TransferServiceProperties properties)
+ {
+ this.producer = producer;
+ this.topic = properties.getTopic();
+ this.path = properties.getPath();
+ }
+
+
+ @PostMapping(
+ path = "/orders",
+ consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,
+ produces = MediaType.TEXT_PLAIN_VALUE)
+ public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody OrderBean order)
+ {
+ DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
+
+ try
+ {
+ UUID uuid = UUID.randomUUID();
+ ProducerRecord<UUID, Order> record =
+ new ProducerRecord<>(
+ topic,
+ uuid,
+ Order
+ .newBuilder()
+ .setId(uuid.toString())
+ .setState(OrderState.CREATED)
+ .setCustomerId(order.getCustomerId())
+ .setOrderId(order.getId())
+ .setProductId(order.getProductId())
+ .setQuantity(order.getQuantity())
+ .build());
+
+ producer.send(record, (metadata, exception) ->
+ {
+ if (exception != null)
+ {
+ LOG.error("Could not place order {}: {}", order, exception.toString());
+ result.setErrorResult(exception);
+ return;
+ }
+
+ result.setResult(ResponseEntity.created(URI.create(path + uuid)).build());
+ });
+ }
+ catch (Exception e)
+ {
+ LOG.error("Unexpected exception!", e);
+ result.setErrorResult(e);
+ }
+
+ return result;
+ }
+}