From: Kai Moritz Date: Mon, 15 Nov 2021 20:41:03 +0000 (+0100) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fmaster;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer WIP --- diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index a5c1faf..e94f703 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -113,9 +113,9 @@ public class TransferServiceApplication mapper, new TransferConsumer.ConsumerUseCases() { @Override - public void create(Long id, Long payer, Long payee, Integer amount) + public TransferStateChangedEvent create(Long id, Long payer, Long payee, Integer amount) { - productionTransferService.create(id, payer, payee, amount); + return productionTransferService.create(id, payer, payee, amount); } @Override @@ -125,16 +125,17 @@ public class TransferServiceApplication } @Override - public void handleStateChange(Long id, Transfer.State state) + public TransferStateChangedEvent handleStateChange( + TransferStateChangedEvent stateChangedEvent) { - productionTransferService.handleStateChange(id, state); + return productionTransferService.handleStateChange(stateChangedEvent); } }, new TransferConsumer.ConsumerUseCases() { @Override - public void create(Long id, Long payer, Long payee, Integer amount) + public TransferStateChangedEvent create(Long id, Long payer, Long payee, Integer amount) { - restoreTransferService.create(id, payer, payee, amount); + return restoreTransferService.create(id, payer, payee, amount); } @Override @@ -144,9 +145,10 @@ public class TransferServiceApplication } @Override - public void handleStateChange(Long id, Transfer.State state) + public TransferStateChangedEvent handleStateChange( + TransferStateChangedEvent stateChangedEvent) { - restoreTransferService.handleStateChange(id, state); + return restoreTransferService.handleStateChange(stateChangedEvent); } }); } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 1cae540..c66c534 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -172,7 +172,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener TransferStateChangedEvent stateChangedEvent = mapper.readValue(record.value(), TransferStateChangedEvent.class); - useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState()); + useCases.handleStateChange(stateChangedEvent); break; } } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java index bfe6156..8591a0d 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java @@ -1,9 +1,11 @@ package de.juplo.kafka.payment.transfer.ports; -import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.adapter.TransferStateChangedEvent; + +import java.util.Optional; public interface CreateTransferUseCase { - void create(Long id, Long payer, Long payee, Integer amount); + Optional create(Long id, Long payer, Long payee, Integer amount); } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java index 9b3e270..7d9fe83 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java @@ -1,9 +1,9 @@ package de.juplo.kafka.payment.transfer.ports; -import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.adapter.TransferStateChangedEvent; public interface HandleStateChangeUseCase { - void handleStateChange(Long id, Transfer.State state); + TransferStateChangedEvent handleStateChange(TransferStateChangedEvent stateChangedEvent); } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferService.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferService.java index d4eb3ef..7ee0432 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferService.java @@ -1,6 +1,7 @@ package de.juplo.kafka.payment.transfer.ports; +import de.juplo.kafka.payment.transfer.adapter.TransferStateChangedEvent; import de.juplo.kafka.payment.transfer.domain.Transfer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -19,36 +20,35 @@ public class TransferService implements CreateTransferUseCase, HandleStateChange private final MessagingService messagingService; @Override - public void create(Long id, Long payer, Long payee, Integer amount) + public Optional create(Long id, Long payer, Long payee, Integer amount) { + return repository .get(id) - .ifPresentOrElse( - stored -> log.info( + .flatMap( + stored -> + {log.info( "transfer already exisits: {}, ignoring: id={}, payer={}, payee={}, amount={}", stored, payer, payee, - amount), - () -> + amount); + return Optional.empty(); + }) + .or(() -> { - Transfer transfer = - Transfer + log.info("creating transfer: {}", transfer); + return + TransferStateChangedEvent .builder() - .id(id) - .payer(payer) - .payee(payee) - .amount(amount) + .id(transfer.getId()) + .state(CREATED) .build(); - - log.info("creating transfer: {}", transfer); - repository.store(transfer); - messagingService.send(transfer.getId(), CREATED); }); } @Override - public void handleStateChange(Long id, Transfer.State state) + public TransferStateChangedEvent handleStateChange(Long id, Transfer.State state) { get(id) .ifPresentOrElse(