WIP
[demos/kafka/demos-kafka-payment-system-transfer] / transfer / src / main / java / de / juplo / kafka / payment / transfer / TransferService.java
1 package de.juplo.kafka.payment.transfer;
2
3
4 import java.net.URI;
5 import java.util.UUID;
6
7 import de.juplo.kafka.payment.avro.Order;
8 import de.juplo.kafka.payment.avro.OrderState;
9 import org.apache.kafka.clients.producer.KafkaProducer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.springframework.http.MediaType;
14 import org.springframework.http.ResponseEntity;
15 import org.springframework.web.bind.annotation.PostMapping;
16 import org.springframework.web.bind.annotation.RequestBody;
17 import org.springframework.web.bind.annotation.RestController;
18 import org.springframework.web.context.request.async.DeferredResult;
19
20 import javax.validation.Valid;
21
22
23 @RestController
24 public class TransferService
25 {
26   private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
27
28   private final KafkaProducer<UUID, Order> producer;
29   private final String topic;
30   private final String path;
31
32
33   TransferService(
34       final KafkaProducer<UUID,Order> producer,
35       final TransferServiceProperties properties)
36   {
37     this.producer = producer;
38     this.topic = properties.getTopic();
39     this.path = properties.getPath();
40   }
41
42
43   @PostMapping(
44       path = "/orders",
45       consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,
46       produces = MediaType.TEXT_PLAIN_VALUE)
47   public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody Transfer order)
48   {
49     DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
50
51     try
52     {
53       UUID uuid = UUID.randomUUID();
54       ProducerRecord<UUID, Order> record =
55           new ProducerRecord<>(
56               topic, 
57               uuid,
58               Order
59                   .newBuilder()
60                   .setId(uuid.toString())
61                   .setState(OrderState.CREATED)
62                   .setCustomerId(order.getCustomerId())
63                   .setOrderId(order.getId())
64                   .setProductId(order.getProductId())
65                   .setQuantity(order.getQuantity())
66                   .build());
67
68       producer.send(record, (metadata, exception) ->
69       {
70         if (exception != null)
71         {
72           LOG.error("Could not place order {}: {}", order, exception.toString());
73           result.setErrorResult(exception);
74           return;
75         }
76
77         result.setResult(ResponseEntity.created(URI.create(path + uuid)).build());
78       });
79     }
80     catch (Exception e)
81     {
82       LOG.error("Unexpected exception!", e);
83       result.setErrorResult(e);
84     }
85
86     return result;
87   }
88 }