Simplified the thread-execution
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
index 251588d..1fd2689 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.payment.transfer.adapter;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
@@ -22,13 +21,11 @@ import org.springframework.web.bind.annotation.ResponseBody;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
-
 
 @RequestMapping("/consumer")
 @ResponseBody
@@ -38,7 +35,6 @@ public class TransferConsumer implements Runnable
 {
   private final String topic;
   private final KafkaConsumer<String, String> consumer;
-  private final ExecutorService executorService;
   private final ObjectMapper mapper;
   private final ConsumerUseCases productionUseCases, restoreUseCases;
 
@@ -83,18 +79,19 @@ public class TransferConsumer implements Runnable
 
           NewTransferEvent newTransferEvent =
               mapper.readValue(record.value(), NewTransferEvent.class);
-          useCases.create(newTransferEvent.toTransfer().setState(CREATED));
+          useCases
+              .create(
+                  newTransferEvent.getId(),
+                  newTransferEvent.getPayer(),
+                  newTransferEvent.getPayee(),
+                  newTransferEvent.getAmount());
           break;
 
         case EventType.TRANSFER_STATE_CHANGED:
 
           TransferStateChangedEvent stateChangedEvent =
               mapper.readValue(record.value(), TransferStateChangedEvent.class);
-          useCases
-              .get(stateChangedEvent.getId())
-              .ifPresentOrElse(
-                  transfer -> useCases.handle(transfer.setState(stateChangedEvent.getState())),
-                  () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
+          useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
           break;
       }
     }
@@ -107,6 +104,16 @@ public class TransferConsumer implements Runnable
           record.partition(),
           record.value());
     }
+    catch (IllegalArgumentException e)
+    {
+      log.error(
+          "ignoring invalid message #{} on {}/{}: {}, message={}",
+          record.offset(),
+          record.topic(),
+          record.partition(),
+          e.getMessage(),
+          record.value());
+    }
   }
 
   @EventListener
@@ -117,7 +124,7 @@ public class TransferConsumer implements Runnable
     // in the same thread, it would block the completion of the initialization.
     // Hence, the app would not react to any signal (CTRL-C, for example) except
     // a KILL until the restoring is finished.
-    future = executorService.submit(() -> restore());
+    future = CompletableFuture.runAsync(() -> restore());
   }
 
   private void restore()
@@ -206,7 +213,7 @@ public class TransferConsumer implements Runnable
     }
 
     running = true;
-    future = executorService.submit(this);
+    future = CompletableFuture.runAsync(this);
 
     log.info("started");
     return result;