f5d41cb0f2031c9207190d3cbd5b9a133df763ff
[demos/kafka/demos-kafka-payment-system-transfer] / TransferService.java
1 package de.juplo.kafka.payment.transfer.domain;
2
3
4 import com.fasterxml.jackson.core.JsonProcessingException;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.KafkaProducer;
9 import org.apache.kafka.clients.producer.ProducerRecord;
10 import org.springframework.http.ResponseEntity;
11
12 import java.net.URI;
13 import java.util.UUID;
14
15 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
16
17
18 @Slf4j
19 @RequiredArgsConstructor
20 public class TransferService
21 {
22   private final TransferRepository repository;
23   private final KafkaProducer<String, String> producer;
24   private final ObjectMapper mapper;
25   private final String topic;
26
27   public synchronized void initiate(Transfer transfer)
28   {
29     repository
30         .get(transfer.getId())
31         .ifPresentOrElse(
32             stored ->
33             {
34               if (!transfer.equals(stored))
35                 throw new IllegalArgumentException(
36                     "Re-Initiation of transfer with different data: old=" +
37                         stored +
38                         ", new=" +
39                         transfer);
40
41               if (stored.getState() == FAILED)
42               {
43                 repository.update(transfer.getId(), FAILED, SENT);
44                 log.info("Resending faild transfer: " + stored);
45                 send(transfer);
46               }
47             },
48             () ->
49             {
50               repository.store(transfer);
51               send(transfer);
52             });
53   }
54
55
56   private void send(Transfer transfer)
57   {
58     try
59     {
60       ProducerRecord<String, String> record =
61           new ProducerRecord<>(
62               topic,
63               Long.toString(transfer.getId()),
64               mapper.writeValueAsString(transfer));
65
66       producer.send(record, (metadata, exception) ->
67       {
68         if (metadata != null)
69         {
70           log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
71           repository.update(transfer.getId(), SENT, PENDING);
72         }
73         else
74         {
75           log.error("Could not send {}: {}", transfer, exception.getMessage());
76           repository.update(transfer.getId(), SENT, FAILED);
77         }
78       });
79     }
80     catch (JsonProcessingException e)
81     {
82       throw new RuntimeException("Could not convert " + transfer, e);
83     }
84   }
85 }