9a24b2bdf1428c07fbc00cba549b843cf66644ff
[demos/kafka/demos-kafka-payment-system-transfer] / transfer / src / main / java / de / juplo / kafka / payment / transfer / impl / TransferServiceImpl.java
1 package de.juplo.kafka.payment.transfer.impl;
2
3
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.api.Transfer;
6 import de.juplo.kafka.payment.transfer.api.TransferService;
7 import de.juplo.kafka.payment.transfer.persistence.TransferRepository;
8 import lombok.RequiredArgsConstructor;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.ProducerRecord;
12 import org.springframework.http.ResponseEntity;
13 import org.springframework.web.bind.annotation.RequestBody;
14 import org.springframework.web.context.request.async.DeferredResult;
15
16 import javax.validation.Valid;
17 import java.net.URI;
18 import java.util.UUID;
19
20
21 @Slf4j
22 @RequiredArgsConstructor
23 public class TransferServiceImpl implements TransferService
24 {
25   private final TransferRepository repository;
26   private final KafkaProducer<UUID, String> producer;
27   private final ObjectMapper mapper;
28   private final String topic;
29
30
31   @Override
32   public void initiate(Transfer transfer)
33   {
34     repository
35         .get(transfer.getId())
36         .ifPresentOrElse(
37             stored ->
38             {
39
40             },
41             () ->
42             {
43
44             });
45     try
46     {
47       ProducerRecord<UUID, String> record =
48           new ProducerRecord<>(
49               topic,
50               transfer.getId(),
51               mapper.writeValueAsString(transfer));
52
53       producer.send(record, (metadata, exception) ->
54       {
55         if (exception != null)
56         {
57           log.error("Could not place order {}: {}", transfer, exception.toString());
58           result.setErrorResult(exception);
59           return;
60         }
61
62         result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
63       });
64     }
65     catch (Exception e)
66     {
67       log.error("Unexpected exception!", e);
68       result.setErrorResult(e);
69     }
70
71     return result;
72   }
73 }