TransferController sends a message, instead of calling TransferService
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / domain / TransferService.java
index 0cbcd2c..90ef682 100644 (file)
@@ -1,29 +1,38 @@
 package de.juplo.kafka.payment.transfer.domain;
 
 
-import de.juplo.kafka.payment.transfer.ports.*;
+import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.MessagingService;
+import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
 
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
 
 
 @Slf4j
 @RequiredArgsConstructor
-public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase
+public class TransferService implements HandleTransferUseCase, GetTransferUseCase
 {
   private final TransferRepository repository;
   private final MessagingService messagingService;
 
-  public CompletableFuture<TopicPartition> receive(Transfer transfer)
+  private void create(Transfer transfer)
   {
-    transfer.setState(RECEIVED);
-    return messagingService.send(transfer);
+    repository
+        .get(transfer.getId())
+        .ifPresentOrElse(
+            stored -> log.info("transfer already exisits: {}, ignoring: {}", stored, transfer),
+            () ->
+            {
+              repository.store(transfer);
+              transfer.setState(CREATED);
+              messagingService.send(transfer);
+            });
   }
 
   @Override
@@ -33,6 +42,11 @@ public class TransferService implements ReceiveTransferUseCase, HandleTransferUs
     switch (state)
     {
       case RECEIVED:
+        repository.store(transfer);
+        create(transfer);
+        break;
+
+      case CREATED:
         repository.store(transfer);
         check(transfer);
         break;
@@ -49,21 +63,9 @@ public class TransferService implements ReceiveTransferUseCase, HandleTransferUs
 
   private void check(Transfer transfer)
   {
-    repository
-        .get(transfer.getId())
-        .ifPresentOrElse(
-            stored ->
-            {
-              if (!transfer.equals(stored))
-                log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer);
-            },
-            () ->
-            {
-              repository.store(transfer);
-              // TODO: Do some time consuming checks...
-              transfer.setState(CHECKED);
-              messagingService.send(transfer);
-            });
+    // TODO: Do some time consuming checks...
+    transfer.setState(CHECKED);
+    messagingService.send(transfer);
   }
 
   public Optional<Transfer> get(Long id)