1 package de.trion.microservices.takeorder;
4 import de.trion.microservices.avro.Order;
5 import de.trion.microservices.avro.OrderState;
8 import javax.validation.Valid;
9 import org.apache.kafka.clients.producer.KafkaProducer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.springframework.http.MediaType;
14 import org.springframework.http.ResponseEntity;
15 import org.springframework.web.bind.annotation.PostMapping;
16 import org.springframework.web.bind.annotation.RequestBody;
17 import org.springframework.web.bind.annotation.RestController;
18 import org.springframework.web.context.request.async.DeferredResult;
22 public class TakeOrderService
24 private final static Logger LOG = LoggerFactory.getLogger(TakeOrderService.class);
26 private final KafkaProducer<UUID,Order> producer;
27 private final String topic;
28 private final String path;
32 final KafkaProducer<UUID,Order> producer,
33 final ApplicationProperties properties)
35 this.producer = producer;
36 this.topic = properties.getTopic();
37 this.path = properties.getPath();
43 consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,
44 produces = MediaType.TEXT_PLAIN_VALUE)
45 public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody OrderBean order)
47 DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
51 UUID uuid = UUID.randomUUID();
52 ProducerRecord<UUID, Order> record =
58 .setId(uuid.toString())
59 .setState(OrderState.CREATED)
60 .setCustomerId(order.getCustomerId())
61 .setOrderId(order.getId())
62 .setProductId(order.getProductId())
63 .setQuantity(order.getQuantity())
66 producer.send(record, (metadata, exception) ->
68 if (exception != null)
70 LOG.error("Could not place order {}: {}", order, exception.toString());
71 result.setErrorResult(exception);
75 result.setResult(ResponseEntity.created(URI.create(path + uuid)).build());
80 LOG.error("Unexpected exception!", e);
81 result.setErrorResult(e);