WIP
[demos/kafka/demos-kafka-payment-system-transfer] / transfer / src / main / java / de / juplo / kafka / payment / transfer / domain / TransferService.java
1 package de.juplo.kafka.payment.transfer.domain;
2
3
4 import com.fasterxml.jackson.core.JsonProcessingException;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.KafkaProducer;
9 import org.apache.kafka.clients.producer.ProducerRecord;
10
11 import java.util.Optional;
12
13 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
14
15
16 @Slf4j
17 @RequiredArgsConstructor
18 public class TransferService
19 {
20   private final TransferRepository repository;
21   private final KafkaProducer<String, String> producer;
22   private final ObjectMapper mapper;
23   private final String topic;
24
25   public synchronized void initiate(Transfer transfer)
26   {
27     repository
28         .get(transfer.getId())
29         .ifPresentOrElse(
30             stored ->
31             {
32               if (!transfer.equals(stored))
33                 throw new IllegalArgumentException(
34                     "Re-Initiation of transfer with different data: old=" +
35                         stored +
36                         ", new=" +
37                         transfer);
38
39               if (stored.getState() == FAILED)
40               {
41                 repository.update(transfer.getId(), FAILED, SENT);
42                 log.info("Resending faild transfer: " + stored);
43                 send(transfer);
44               }
45             },
46             () ->
47             {
48               send(transfer);
49               transfer.setState(SENT);
50               repository.store(transfer);
51             });
52   }
53
54
55   private void send(Transfer transfer)
56   {
57     try
58     {
59       ProducerRecord<String, String> record =
60           new ProducerRecord<>(
61               topic,
62               Long.toString(transfer.getId()),
63               mapper.writeValueAsString(transfer));
64
65       producer.send(record, (metadata, exception) ->
66       {
67         if (metadata != null)
68         {
69           log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
70           repository.update(transfer.getId(), SENT, PENDING);
71         }
72         else
73         {
74           log.error("Could not send {}: {}", transfer, exception.getMessage());
75           repository.update(transfer.getId(), SENT, FAILED);
76         }
77       });
78     }
79     catch (JsonProcessingException e)
80     {
81       throw new RuntimeException("Could not convert " + transfer, e);
82     }
83   }
84
85   public Optional<Transfer> get(Long id)
86   {
87     return repository.get(id);
88   }
89 }