TransferRepository does not need any synchronization
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / domain / TransferService.java
1 package de.juplo.kafka.payment.transfer.domain;
2
3
4 import de.juplo.kafka.payment.transfer.ports.*;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.common.TopicPartition;
8
9 import java.util.Optional;
10 import java.util.concurrent.CompletableFuture;
11
12 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
13 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
14
15
16 @Slf4j
17 @RequiredArgsConstructor
18 public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase
19 {
20   private final TransferRepository repository;
21   private final MessagingService messagingService;
22
23   public CompletableFuture<TopicPartition> receive(Transfer transfer)
24   {
25     transfer.setState(RECEIVED);
26     return messagingService.send(transfer);
27   }
28
29   @Override
30   public void handle(Transfer transfer)
31   {
32     Transfer.State state = transfer.getState();
33     switch (state)
34     {
35       case RECEIVED:
36         repository.store(transfer);
37         check(transfer);
38         break;
39
40       case CHECKED:
41         repository.store(transfer);
42         // TODO: What's next...?
43         break;
44
45       default:
46         log.warn("TODO: handle {} state {}", state.foreign ? "foreign" : "domain", state);
47     }
48   }
49
50   private void check(Transfer transfer)
51   {
52     repository
53         .get(transfer.getId())
54         .ifPresentOrElse(
55             stored ->
56             {
57               if (!transfer.equals(stored))
58                 log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer);
59             },
60             () ->
61             {
62               repository.store(transfer);
63               // TODO: Do some time consuming checks...
64               transfer.setState(CHECKED);
65               messagingService.send(transfer);
66             });
67   }
68
69   public Optional<Transfer> get(Long id)
70   {
71     return repository.get(id);
72   }
73 }