X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fdomain%2FTransferService.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fdomain%2FTransferService.java;h=90ef682cc5c95dee868717f938c481cf7f982b93;hp=0cbcd2cfc681594fafd4c5890e6eb8afbfa1fd4d;hb=fae41770a1f65b4ddfe8d51d09a8a8cdc35a5bdd;hpb=4467c5240397a47b181106a0ae902ed1b71d0c5d diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java index 0cbcd2c..90ef682 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -1,29 +1,38 @@ package de.juplo.kafka.payment.transfer.domain; -import de.juplo.kafka.payment.transfer.ports.*; +import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.MessagingService; +import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.TopicPartition; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; @Slf4j @RequiredArgsConstructor -public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase +public class TransferService implements HandleTransferUseCase, GetTransferUseCase { private final TransferRepository repository; private final MessagingService messagingService; - public CompletableFuture receive(Transfer transfer) + private void create(Transfer transfer) { - transfer.setState(RECEIVED); - return messagingService.send(transfer); + repository + .get(transfer.getId()) + .ifPresentOrElse( + stored -> log.info("transfer already exisits: {}, ignoring: {}", stored, transfer), + () -> + { + repository.store(transfer); + transfer.setState(CREATED); + messagingService.send(transfer); + }); } @Override @@ -33,6 +42,11 @@ public class TransferService implements ReceiveTransferUseCase, HandleTransferUs switch (state) { case RECEIVED: + repository.store(transfer); + create(transfer); + break; + + case CREATED: repository.store(transfer); check(transfer); break; @@ -49,21 +63,9 @@ public class TransferService implements ReceiveTransferUseCase, HandleTransferUs private void check(Transfer transfer) { - repository - .get(transfer.getId()) - .ifPresentOrElse( - stored -> - { - if (!transfer.equals(stored)) - log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer); - }, - () -> - { - repository.store(transfer); - // TODO: Do some time consuming checks... - transfer.setState(CHECKED); - messagingService.send(transfer); - }); + // TODO: Do some time consuming checks... + transfer.setState(CHECKED); + messagingService.send(transfer); } public Optional get(Long id)