Introduced different Events for the creation and the state-changes
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
index e7c2430..24f3e88 100644 (file)
@@ -3,7 +3,9 @@ package de.juplo.kafka.payment.transfer.adapter;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
-import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -19,6 +21,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
+
 
 @RequestMapping("/consumer")
 @ResponseBody
 
 @RequestMapping("/consumer")
 @ResponseBody
@@ -30,7 +34,9 @@ public class TransferConsumer implements Runnable
   private final KafkaConsumer<String, String> consumer;
   private final ExecutorService executorService;
   private final ObjectMapper mapper;
   private final KafkaConsumer<String, String> consumer;
   private final ExecutorService executorService;
   private final ObjectMapper mapper;
-  private final HandleTransferUseCase handleTransferUseCase;
+  private final GetTransferUseCase getTransferUseCase;
+  private final CreateTransferUseCase createTransferUseCase;
+  private final HandleStateChangeUseCase handleStateChangeUseCase;
 
   private boolean running = false;
   private Future<?> future = null;
 
   private boolean running = false;
   private Future<?> future = null;
@@ -51,8 +57,28 @@ public class TransferConsumer implements Runnable
         {
           try
           {
         {
           try
           {
-            Transfer transfer = mapper.readValue(record.value(), Transfer.class);
-            handleTransferUseCase.handle(transfer);
+            byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
+
+            switch (eventType)
+            {
+              case EventType.NEW_TRANSFER:
+
+                NewTransferEvent newTransferEvent =
+                    mapper.readValue(record.value(), NewTransferEvent.class);
+                createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED));
+                break;
+
+              case EventType.TRANSFER_STATE_CHANGED:
+
+                TransferStateChangedEvent stateChangedEvent =
+                    mapper.readValue(record.value(), TransferStateChangedEvent.class);
+                getTransferUseCase
+                    .get(stateChangedEvent.getId())
+                    .ifPresentOrElse(
+                        transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())),
+                        () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
+                break;
+            }
           }
           catch (JsonProcessingException e)
           {
           }
           catch (JsonProcessingException e)
           {