1 package de.juplo.kafka.payment.transfer.domain;
4 import de.juplo.kafka.payment.transfer.ports.*;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.common.TopicPartition;
9 import java.util.Optional;
10 import java.util.concurrent.CompletableFuture;
12 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
13 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
17 @RequiredArgsConstructor
18 public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase
20 private final TransferRepository repository;
21 private final MessagingService messagingService;
23 public CompletableFuture<TopicPartition> receive(Transfer transfer)
25 transfer.setState(RECEIVED);
26 return messagingService.send(transfer);
30 public void handle(Transfer transfer)
32 Transfer.State state = transfer.getState();
36 repository.store(transfer);
41 repository.store(transfer);
42 // TODO: What's next...?
46 log.warn("TODO: handle {} state {}", state.foreign ? "foreign" : "domain", state);
50 private void check(Transfer transfer)
53 .get(transfer.getId())
57 if (!transfer.equals(stored))
58 log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer);
62 repository.store(transfer);
63 // TODO: Do some time consuming checks...
64 transfer.setState(CHECKED);
65 messagingService.send(transfer);
69 public Optional<Transfer> get(Long id)
71 return repository.get(id);