9e72af163c9f2b21caa226ce256e6b6d7a9c98e5
[demos/kafka/demos-kafka-payment-system-transfer] / transfer / src / main / java / de / juplo / kafka / payment / transfer / TransferService.java
1 package de.juplo.kafka.payment.transfer;
2
3
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.http.MediaType;
10 import org.springframework.http.ResponseEntity;
11 import org.springframework.web.bind.annotation.PostMapping;
12 import org.springframework.web.bind.annotation.RequestBody;
13 import org.springframework.web.bind.annotation.RestController;
14 import org.springframework.web.context.request.async.DeferredResult;
15
16 import javax.validation.Valid;
17 import java.net.URI;
18 import java.util.UUID;
19
20
21 @RestController
22 public class TransferService
23 {
24   private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
25
26   private final KafkaProducer<UUID, String> producer;
27   private final ObjectMapper mapper;
28   private final String topic;
29   private final String path;
30
31
32   TransferService(
33       final KafkaProducer<UUID, String> producer,
34       final ObjectMapper mapper,
35       final TransferServiceProperties properties)
36   {
37     this.producer = producer;
38     this.mapper = mapper;
39     this.topic = properties.getTopic();
40     this.path = properties.getPath();
41   }
42
43
44   @PostMapping(
45       path = "/transfer",
46       consumes = MediaType.APPLICATION_JSON_VALUE,
47       produces = MediaType.TEXT_PLAIN_VALUE)
48   public DeferredResult<ResponseEntity<?>> transfer(@Valid @RequestBody Transfer transfer)
49   {
50     DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
51
52     try
53     {
54       ProducerRecord<UUID, String> record =
55           new ProducerRecord<>(
56               topic,
57               transfer.getId(),
58               mapper.writeValueAsString(transfer));
59
60       producer.send(record, (metadata, exception) ->
61       {
62         if (exception != null)
63         {
64           LOG.error("Could not place order {}: {}", transfer, exception.toString());
65           result.setErrorResult(exception);
66           return;
67         }
68
69         result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
70       });
71     }
72     catch (Exception e)
73     {
74       LOG.error("Unexpected exception!", e);
75       result.setErrorResult(e);
76     }
77
78     return result;
79   }
80 }