X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fdomain%2FTransferService.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fdomain%2FTransferService.java;h=0cbcd2cfc681594fafd4c5890e6eb8afbfa1fd4d;hp=3e6265fa0cacd1b3e76ddce63b222bb63a53473f;hb=4467c5240397a47b181106a0ae902ed1b71d0c5d;hpb=540f0c5e8ef2c815d7ff37c7af2e119c448cbb1b diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java index 3e6265f..0cbcd2c 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -1,26 +1,53 @@ package de.juplo.kafka.payment.transfer.domain; -import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; -import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase; -import de.juplo.kafka.payment.transfer.ports.MessagingService; -import de.juplo.kafka.payment.transfer.ports.TransferRepository; +import de.juplo.kafka.payment.transfer.ports.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; import java.util.Optional; +import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED; @Slf4j @RequiredArgsConstructor -public class TransferService implements InitiateTransferUseCase, GetTransferUseCase +public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase { private final TransferRepository repository; private final MessagingService messagingService; - public synchronized void initiate(Transfer transfer) + public CompletableFuture receive(Transfer transfer) + { + transfer.setState(RECEIVED); + return messagingService.send(transfer); + } + + @Override + public void handle(Transfer transfer) + { + Transfer.State state = transfer.getState(); + switch (state) + { + case RECEIVED: + repository.store(transfer); + check(transfer); + break; + + case CHECKED: + repository.store(transfer); + // TODO: What's next...? + break; + + default: + log.warn("TODO: handle {} state {}", state.foreign ? "foreign" : "domain", state); + } + } + + private void check(Transfer transfer) { repository .get(transfer.getId()) @@ -28,42 +55,14 @@ public class TransferService implements InitiateTransferUseCase, GetTransferUseC stored -> { if (!transfer.equals(stored)) - throw new IllegalArgumentException( - "Re-Initiation of transfer with different data: old=" + - stored + - ", new=" + - transfer); - - if (stored.getState() == FAILED) - { - repository.update(transfer.getId(), FAILED, SENT); - log.info("Resending faild transfer: " + stored); - send(transfer); - } + log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer); }, () -> { - send(transfer); - transfer.setState(SENT); repository.store(transfer); - }); - } - - private void send(Transfer transfer) - { - messagingService - .send(transfer) - .thenApply( - $ -> - { - repository.update(transfer.getId(), SENT, PENDING); - return null; - }) - .exceptionally( - e -> - { - repository.update(transfer.getId(), SENT, FAILED); - return null; + // TODO: Do some time consuming checks... + transfer.setState(CHECKED); + messagingService.send(transfer); }); }