X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fdomain%2FTransferService.java;h=90ef682cc5c95dee868717f938c481cf7f982b93;hp=3e6265fa0cacd1b3e76ddce63b222bb63a53473f;hb=fae41770a1f65b4ddfe8d51d09a8a8cdc35a5bdd;hpb=6191849fee717b080118717c86df79fad12bafc8 diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java index 3e6265f..90ef682 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -2,7 +2,7 @@ package de.juplo.kafka.payment.transfer.domain; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; -import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase; import de.juplo.kafka.payment.transfer.ports.MessagingService; import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.RequiredArgsConstructor; @@ -10,61 +10,62 @@ import lombok.extern.slf4j.Slf4j; import java.util.Optional; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; @Slf4j @RequiredArgsConstructor -public class TransferService implements InitiateTransferUseCase, GetTransferUseCase +public class TransferService implements HandleTransferUseCase, GetTransferUseCase { private final TransferRepository repository; private final MessagingService messagingService; - public synchronized void initiate(Transfer transfer) + private void create(Transfer transfer) { repository .get(transfer.getId()) .ifPresentOrElse( - stored -> - { - if (!transfer.equals(stored)) - throw new IllegalArgumentException( - "Re-Initiation of transfer with different data: old=" + - stored + - ", new=" + - transfer); - - if (stored.getState() == FAILED) - { - repository.update(transfer.getId(), FAILED, SENT); - log.info("Resending faild transfer: " + stored); - send(transfer); - } - }, + stored -> log.info("transfer already exisits: {}, ignoring: {}", stored, transfer), () -> { - send(transfer); - transfer.setState(SENT); repository.store(transfer); + transfer.setState(CREATED); + messagingService.send(transfer); }); } - private void send(Transfer transfer) + @Override + public void handle(Transfer transfer) { - messagingService - .send(transfer) - .thenApply( - $ -> - { - repository.update(transfer.getId(), SENT, PENDING); - return null; - }) - .exceptionally( - e -> - { - repository.update(transfer.getId(), SENT, FAILED); - return null; - }); + Transfer.State state = transfer.getState(); + switch (state) + { + case RECEIVED: + repository.store(transfer); + create(transfer); + break; + + case CREATED: + repository.store(transfer); + check(transfer); + break; + + case CHECKED: + repository.store(transfer); + // TODO: What's next...? + break; + + default: + log.warn("TODO: handle {} state {}", state.foreign ? "foreign" : "domain", state); + } + } + + private void check(Transfer transfer) + { + // TODO: Do some time consuming checks... + transfer.setState(CHECKED); + messagingService.send(transfer); } public Optional get(Long id)