X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=24f3e8893f259ba03d3d9f0823914804d070eefb;hp=e7c2430a2d3b5c4aa4eb6a3df865033b2284b347;hb=26809d379a0e024017f70db8c70382f94faf98b6;hpb=78721edb87827a1837f298f24761af8be640c58b diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index e7c2430..24f3e88 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -3,7 +3,9 @@ package de.juplo.kafka.payment.transfer.adapter; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.payment.transfer.domain.Transfer; -import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -19,6 +21,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; + @RequestMapping("/consumer") @ResponseBody @@ -30,7 +34,9 @@ public class TransferConsumer implements Runnable private final KafkaConsumer consumer; private final ExecutorService executorService; private final ObjectMapper mapper; - private final HandleTransferUseCase handleTransferUseCase; + private final GetTransferUseCase getTransferUseCase; + private final CreateTransferUseCase createTransferUseCase; + private final HandleStateChangeUseCase handleStateChangeUseCase; private boolean running = false; private Future future = null; @@ -51,8 +57,28 @@ public class TransferConsumer implements Runnable { try { - Transfer transfer = mapper.readValue(record.value(), Transfer.class); - handleTransferUseCase.handle(transfer); + byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0]; + + switch (eventType) + { + case EventType.NEW_TRANSFER: + + NewTransferEvent newTransferEvent = + mapper.readValue(record.value(), NewTransferEvent.class); + createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED)); + break; + + case EventType.TRANSFER_STATE_CHANGED: + + TransferStateChangedEvent stateChangedEvent = + mapper.readValue(record.value(), TransferStateChangedEvent.class); + getTransferUseCase + .get(stateChangedEvent.getId()) + .ifPresentOrElse( + transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())), + () -> log.error("unknown transfer: {}", stateChangedEvent.getId())); + break; + } } catch (JsonProcessingException e) {