From: Kai Moritz Date: Sat, 19 Jun 2021 07:08:24 +0000 (+0200) Subject: TransferController sends a message, instead of calling TransferService X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=fae41770a1f65b4ddfe8d51d09a8a8cdc35a5bdd;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer TransferController sends a message, instead of calling TransferService --- diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java index 0f8cf2b..3161af3 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java @@ -23,7 +23,7 @@ public class KafkaMessagingService implements MessagingService @Override - public CompletableFuture send(Transfer transfer) + public CompletableFuture send(Transfer transfer) { try { diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java index f31d1a8..8240310 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java @@ -3,7 +3,7 @@ package de.juplo.kafka.payment.transfer.adapter; import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; -import de.juplo.kafka.payment.transfer.ports.ReceiveTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.MessagingService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; @@ -20,6 +20,8 @@ import java.net.URI; import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; @RestController @@ -29,8 +31,8 @@ import java.util.Map; { public final static String PATH = "/transfers"; - private final ReceiveTransferUseCase receiveTransferUseCase; private final GetTransferUseCase getTransferUseCase; + private final MessagingService messagingService; @PostMapping( @@ -41,36 +43,48 @@ import java.util.Map; HttpServletRequest request, @Valid @RequestBody TransferDTO transferDTO) { - Transfer transfer = - Transfer - .builder() - .id(transferDTO.getId()) - .payer(transferDTO.getPayer()) - .payee(transferDTO.getPayee()) - .amount(transferDTO.getAmount()) - .build(); - DeferredResult> result = new DeferredResult<>(); - receiveTransferUseCase - .receive(transfer) - .thenApply( - $ -> + getTransferUseCase + .get(transferDTO.getId()) + .map(transfer -> + CompletableFuture.completedFuture( ResponseEntity - .created(URI.create(PATH + "/" + transferDTO.getId())) - .build()) - .thenAccept( - responseEntity -> result.setResult(responseEntity)) - .exceptionally( - e -> - { - result.setErrorResult(e); - return null; - }); + .ok() + .location(location(transferDTO)) + .build())) + .or(() -> + Optional.of( + messagingService + .send( + Transfer + .builder() + .id(transferDTO.getId()) + .payer(transferDTO.getPayer()) + .payee(transferDTO.getPayee()) + .amount(transferDTO.getAmount()) + .state(Transfer.State.RECEIVED) + .build()) + .thenApply($ -> + ResponseEntity + .created(location(transferDTO)) + .build()))) + .get() + .thenAccept(responseEntity -> result.setResult(responseEntity)) + .exceptionally(e -> + { + result.setErrorResult(e); + return null; + }); return result; } + private URI location(TransferDTO transferDTO) + { + return URI.create(PATH + "/" + transferDTO.getId()); + } + @GetMapping( path = PATH + "/{id}", produces = MediaType.APPLICATION_JSON_VALUE) diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferDTO.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferDTO.java index 1119e72..ff50377 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferDTO.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferDTO.java @@ -31,19 +31,6 @@ public class TransferDTO private Transfer.State state; - public Transfer toTransfer() - { - return - Transfer - .builder() - .id(id) - .payer(payer) - .payee(payee) - .amount(amount) - .build(); - } - - public static TransferDTO of(Transfer transfer) { return diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java index 82891b7..cc207d9 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java @@ -17,6 +17,7 @@ public class Transfer public enum State { RECEIVED(false), + CREATED(false), INVALID(false), CHECKED(false), APPROVED(true), diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java index 0cbcd2c..90ef682 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -1,29 +1,38 @@ package de.juplo.kafka.payment.transfer.domain; -import de.juplo.kafka.payment.transfer.ports.*; +import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.MessagingService; +import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.TopicPartition; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; @Slf4j @RequiredArgsConstructor -public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase +public class TransferService implements HandleTransferUseCase, GetTransferUseCase { private final TransferRepository repository; private final MessagingService messagingService; - public CompletableFuture receive(Transfer transfer) + private void create(Transfer transfer) { - transfer.setState(RECEIVED); - return messagingService.send(transfer); + repository + .get(transfer.getId()) + .ifPresentOrElse( + stored -> log.info("transfer already exisits: {}, ignoring: {}", stored, transfer), + () -> + { + repository.store(transfer); + transfer.setState(CREATED); + messagingService.send(transfer); + }); } @Override @@ -33,6 +42,11 @@ public class TransferService implements ReceiveTransferUseCase, HandleTransferUs switch (state) { case RECEIVED: + repository.store(transfer); + create(transfer); + break; + + case CREATED: repository.store(transfer); check(transfer); break; @@ -49,21 +63,9 @@ public class TransferService implements ReceiveTransferUseCase, HandleTransferUs private void check(Transfer transfer) { - repository - .get(transfer.getId()) - .ifPresentOrElse( - stored -> - { - if (!transfer.equals(stored)) - log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer); - }, - () -> - { - repository.store(transfer); - // TODO: Do some time consuming checks... - transfer.setState(CHECKED); - messagingService.send(transfer); - }); + // TODO: Do some time consuming checks... + transfer.setState(CHECKED); + messagingService.send(transfer); } public Optional get(Long id) diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java index 81e7555..4037a90 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java @@ -7,5 +7,5 @@ import java.util.concurrent.CompletableFuture; public interface MessagingService { - CompletableFuture send(Transfer transfer); + CompletableFuture send(Transfer transfer); } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java deleted file mode 100644 index f892fb3..0000000 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java +++ /dev/null @@ -1,12 +0,0 @@ -package de.juplo.kafka.payment.transfer.ports; - -import de.juplo.kafka.payment.transfer.domain.Transfer; -import org.apache.kafka.common.TopicPartition; - -import java.util.concurrent.CompletableFuture; - - -public interface ReceiveTransferUseCase -{ - CompletableFuture receive(Transfer transfer); -}