From: Kai Moritz Date: Sat, 19 Jun 2021 08:53:31 +0000 (+0200) Subject: Moved all business logic into TransferService X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=951dcf0533551cccf48062c12e61192035a27a9a;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer Moved all business logic into TransferService --- diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 259b62d..557b857 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -80,9 +80,9 @@ public class TransferServiceApplication mapper, new TransferConsumer.ConsumerUseCases() { @Override - public void create(Transfer transfer) + public void create(Long id, Long payer, Long payee, Integer amount) { - productionTransferService.create(transfer); + productionTransferService.create(id, payer, payee, amount); } @Override @@ -92,16 +92,16 @@ public class TransferServiceApplication } @Override - public void handle(Transfer transfer) + public void handleStateChange(Long id, Transfer.State state) { - productionTransferService.handle(transfer); + productionTransferService.handleStateChange(id, state); } }, new TransferConsumer.ConsumerUseCases() { @Override - public void create(Transfer transfer) + public void create(Long id, Long payer, Long payee, Integer amount) { - restoreTransferService.create(transfer); + restoreTransferService.create(id, payer, payee, amount); } @Override @@ -111,9 +111,9 @@ public class TransferServiceApplication } @Override - public void handle(Transfer transfer) + public void handleStateChange(Long id, Transfer.State state) { - restoreTransferService.handle(transfer); + restoreTransferService.handleStateChange(id, state); } }); } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/NewTransferEvent.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/NewTransferEvent.java index 0c5e271..2606d2a 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/NewTransferEvent.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/NewTransferEvent.java @@ -16,18 +16,6 @@ public class NewTransferEvent private Long payee; private Integer amount; - public Transfer toTransfer() - { - return - Transfer - .builder() - .id(id) - .payer(payer) - .payee(payee) - .amount(amount) - .build(); - } - public static NewTransferEvent ofTransfer(Transfer transfer) { return diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 251588d..63fbef5 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -2,7 +2,6 @@ package de.juplo.kafka.payment.transfer.adapter; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; @@ -27,8 +26,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.stream.Collectors; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; - @RequestMapping("/consumer") @ResponseBody @@ -83,18 +80,19 @@ public class TransferConsumer implements Runnable NewTransferEvent newTransferEvent = mapper.readValue(record.value(), NewTransferEvent.class); - useCases.create(newTransferEvent.toTransfer().setState(CREATED)); + useCases + .create( + newTransferEvent.getId(), + newTransferEvent.getPayer(), + newTransferEvent.getPayee(), + newTransferEvent.getAmount()); break; case EventType.TRANSFER_STATE_CHANGED: TransferStateChangedEvent stateChangedEvent = mapper.readValue(record.value(), TransferStateChangedEvent.class); - useCases - .get(stateChangedEvent.getId()) - .ifPresentOrElse( - transfer -> useCases.handle(transfer.setState(stateChangedEvent.getState())), - () -> log.error("unknown transfer: {}", stateChangedEvent.getId())); + useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState()); break; } } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java index 00c5478..ed1a98e 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -19,14 +19,28 @@ public class TransferService implements CreateTransferUseCase, HandleStateChange private final MessagingService messagingService; @Override - public void create(Transfer transfer) + public void create(Long id, Long payer, Long payee, Integer amount) { repository - .get(transfer.getId()) + .get(id) .ifPresentOrElse( - stored -> log.info("transfer already exisits: {}, ignoring: {}", stored, transfer), + stored -> log.info( + "transfer already exisits: {}, ignoring: id={}, payer={}, payee={}, amount={}", + stored, + payer, + payee, + amount), () -> { + Transfer transfer = + Transfer + .builder() + .id(id) + .payer(payer) + .payee(payee) + .amount(amount) + .build(); + log.info("creating transfer: {}", transfer); repository.store(transfer); messagingService.send(transfer.getId(), CREATED); @@ -34,24 +48,34 @@ public class TransferService implements CreateTransferUseCase, HandleStateChange } @Override - public void handle(Transfer transfer) + public void handleStateChange(Long id, Transfer.State state) { - Transfer.State state = transfer.getState(); - switch (state) - { - case CREATED: - repository.store(transfer); - check(transfer); - break; - - case CHECKED: - repository.store(transfer); - // TODO: What's next...? - break; - - default: - log.warn("TODO: handle {} state {}", state.foreign ? "foreign" : "domain", state); - } + get(id) + .ifPresentOrElse( + transfer -> + { + switch (state) + { + case CREATED: + + transfer.setState(CREATED); + repository.store(transfer); + check(transfer); + break; + + case CHECKED: + + transfer.setState(CHECKED); + repository.store(transfer); + // TODO: What's next...? + break; + + default: + + log.warn("TODO: handle {} state {}", state.foreign ? "foreign" : "domain", state); + } + }, + () -> log.error("unknown transfer: {}", id)); } private void check(Transfer transfer) diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java index 34ae0e9..bfe6156 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java @@ -5,5 +5,5 @@ import de.juplo.kafka.payment.transfer.domain.Transfer; public interface CreateTransferUseCase { - void create(Transfer transfer); + void create(Long id, Long payer, Long payee, Integer amount); } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java index 2e75fc0..9b3e270 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java @@ -5,5 +5,5 @@ import de.juplo.kafka.payment.transfer.domain.Transfer; public interface HandleStateChangeUseCase { - void handle(Transfer transfer); + void handleStateChange(Long id, Transfer.State state); }