MVP for transfer service
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / domain / TransferService.java
1 package de.juplo.kafka.payment.transfer.domain;
2
3
4 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
5 import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase;
6 import de.juplo.kafka.payment.transfer.ports.MessagingService;
7 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
8 import lombok.RequiredArgsConstructor;
9 import lombok.extern.slf4j.Slf4j;
10
11 import java.util.Optional;
12
13 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
14
15
16 @Slf4j
17 @RequiredArgsConstructor
18 public class TransferService implements InitiateTransferUseCase, GetTransferUseCase
19 {
20   private final TransferRepository repository;
21   private final MessagingService messagingService;
22
23   public synchronized void initiate(Transfer transfer)
24   {
25     repository
26         .get(transfer.getId())
27         .ifPresentOrElse(
28             stored ->
29             {
30               if (!transfer.equals(stored))
31                 throw new IllegalArgumentException(
32                     "Re-Initiation of transfer with different data: old=" +
33                         stored +
34                         ", new=" +
35                         transfer);
36
37               if (stored.getState() == FAILED)
38               {
39                 repository.update(transfer.getId(), FAILED, SENT);
40                 log.info("Resending faild transfer: " + stored);
41                 send(transfer);
42               }
43             },
44             () ->
45             {
46               send(transfer);
47               transfer.setState(SENT);
48               repository.store(transfer);
49             });
50   }
51
52   private void send(Transfer transfer)
53   {
54     messagingService
55         .send(transfer)
56         .thenApply(
57             $ ->
58             {
59               repository.update(transfer.getId(), SENT, PENDING);
60               return null;
61             })
62         .exceptionally(
63             e ->
64             {
65               repository.update(transfer.getId(), SENT, FAILED);
66               return null;
67             });
68   }
69
70   public Optional<Transfer> get(Long id)
71   {
72     return repository.get(id);
73   }
74 }