X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=63fbef5f523bb8a7d9cc1f838e4d51726ace008d;hp=251588d2ea0e6c9fb826265130c856f68685cc12;hb=951dcf0533551cccf48062c12e61192035a27a9a;hpb=edc88d6eac8c502ab0297380489ccc9ba706b5f0 diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 251588d..63fbef5 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -2,7 +2,6 @@ package de.juplo.kafka.payment.transfer.adapter; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; @@ -27,8 +26,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.stream.Collectors; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; - @RequestMapping("/consumer") @ResponseBody @@ -83,18 +80,19 @@ public class TransferConsumer implements Runnable NewTransferEvent newTransferEvent = mapper.readValue(record.value(), NewTransferEvent.class); - useCases.create(newTransferEvent.toTransfer().setState(CREATED)); + useCases + .create( + newTransferEvent.getId(), + newTransferEvent.getPayer(), + newTransferEvent.getPayee(), + newTransferEvent.getAmount()); break; case EventType.TRANSFER_STATE_CHANGED: TransferStateChangedEvent stateChangedEvent = mapper.readValue(record.value(), TransferStateChangedEvent.class); - useCases - .get(stateChangedEvent.getId()) - .ifPresentOrElse( - transfer -> useCases.handle(transfer.setState(stateChangedEvent.getState())), - () -> log.error("unknown transfer: {}", stateChangedEvent.getId())); + useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState()); break; } }