WIP
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / ports / TransferService.java
1 package de.juplo.kafka.payment.transfer.ports;
2
3
4 import de.juplo.kafka.payment.transfer.adapter.TransferStateChangedEvent;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8
9 import java.util.Optional;
10
11 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
12 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
13
14
15 @Slf4j
16 @RequiredArgsConstructor
17 public class TransferService implements CreateTransferUseCase, HandleStateChangeUseCase, GetTransferUseCase
18 {
19   private final TransferRepository repository;
20   private final MessagingService messagingService;
21
22   @Override
23   public Optional<TransferStateChangedEvent> create(Long id, Long payer, Long payee, Integer amount)
24   {
25     return
26     repository
27         .get(id)
28         .flatMap(
29             stored ->
30             {log.info(
31                 "transfer already exisits: {}, ignoring: id={}, payer={}, payee={}, amount={}",
32                 stored,
33                 payer,
34                 payee,
35                 amount);
36             return Optional.empty();
37             })
38         .or(() ->
39             {
40               log.info("creating transfer: {}", transfer);
41               return
42                   TransferStateChangedEvent
43                       .builder()
44                       .id(transfer.getId())
45                       .state(CREATED)
46                       .build();
47             });
48   }
49
50   @Override
51   public TransferStateChangedEvent handleStateChange(Long id, Transfer.State state)
52   {
53     get(id)
54         .ifPresentOrElse(
55             transfer ->
56             {
57               switch (state)
58               {
59                 case CREATED:
60
61                   transfer.setState(CREATED);
62                   repository.store(transfer);
63                   check(transfer);
64                   break;
65
66                 case CHECKED:
67
68                   transfer.setState(CHECKED);
69                   repository.store(transfer);
70                   // TODO: What's next...?
71                   break;
72
73                 default:
74
75                   log.warn("TODO: handle {} state {}", state.foreign ? "foreign" : "domain", state);
76               }
77             },
78             () -> log.error("unknown transfer: {}", id));
79   }
80
81   private void check(Transfer transfer)
82   {
83     // TODO: Do some time consuming checks...
84     messagingService.send(transfer.getId(), CHECKED);
85   }
86
87   public Optional<Transfer> get(Long id)
88   {
89     return repository.get(id);
90   }
91 }