TransferRepository does not need any synchronization
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / domain / TransferService.java
index 3e6265f..0cbcd2c 100644 (file)
@@ -1,26 +1,53 @@
 package de.juplo.kafka.payment.transfer.domain;
 
 
-import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.MessagingService;
-import de.juplo.kafka.payment.transfer.ports.TransferRepository;
+import de.juplo.kafka.payment.transfer.ports.*;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
 
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
 
 
 @Slf4j
 @RequiredArgsConstructor
-public class TransferService implements InitiateTransferUseCase, GetTransferUseCase
+public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase
 {
   private final TransferRepository repository;
   private final MessagingService messagingService;
 
-  public synchronized void initiate(Transfer transfer)
+  public CompletableFuture<TopicPartition> receive(Transfer transfer)
+  {
+    transfer.setState(RECEIVED);
+    return messagingService.send(transfer);
+  }
+
+  @Override
+  public void handle(Transfer transfer)
+  {
+    Transfer.State state = transfer.getState();
+    switch (state)
+    {
+      case RECEIVED:
+        repository.store(transfer);
+        check(transfer);
+        break;
+
+      case CHECKED:
+        repository.store(transfer);
+        // TODO: What's next...?
+        break;
+
+      default:
+        log.warn("TODO: handle {} state {}", state.foreign ? "foreign" : "domain", state);
+    }
+  }
+
+  private void check(Transfer transfer)
   {
     repository
         .get(transfer.getId())
@@ -28,42 +55,14 @@ public class TransferService implements InitiateTransferUseCase, GetTransferUseC
             stored ->
             {
               if (!transfer.equals(stored))
-                throw new IllegalArgumentException(
-                    "Re-Initiation of transfer with different data: old=" +
-                        stored +
-                        ", new=" +
-                        transfer);
-
-              if (stored.getState() == FAILED)
-              {
-                repository.update(transfer.getId(), FAILED, SENT);
-                log.info("Resending faild transfer: " + stored);
-                send(transfer);
-              }
+                log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer);
             },
             () ->
             {
-              send(transfer);
-              transfer.setState(SENT);
               repository.store(transfer);
-            });
-  }
-
-  private void send(Transfer transfer)
-  {
-    messagingService
-        .send(transfer)
-        .thenApply(
-            $ ->
-            {
-              repository.update(transfer.getId(), SENT, PENDING);
-              return null;
-            })
-        .exceptionally(
-            e ->
-            {
-              repository.update(transfer.getId(), SENT, FAILED);
-              return null;
+              // TODO: Do some time consuming checks...
+              transfer.setState(CHECKED);
+              messagingService.send(transfer);
             });
   }