TransferController sends a message, instead of calling TransferService
authorKai Moritz <kai@juplo.de>
Sat, 19 Jun 2021 07:08:24 +0000 (09:08 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 19 Jun 2021 16:02:03 +0000 (18:02 +0200)
src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferDTO.java
src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java
src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java
src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java
src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java [deleted file]

index 0f8cf2b..3161af3 100644 (file)
@@ -23,7 +23,7 @@ public class KafkaMessagingService implements MessagingService
 
 
   @Override
 
 
   @Override
-  public CompletableFuture send(Transfer transfer)
+  public CompletableFuture<?> send(Transfer transfer)
   {
     try
     {
   {
     try
     {
index f31d1a8..8240310 100644 (file)
@@ -3,7 +3,7 @@ package de.juplo.kafka.payment.transfer.adapter;
 
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
 
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.ReceiveTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.MessagingService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.HttpStatus;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.HttpStatus;
@@ -20,6 +20,8 @@ import java.net.URI;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 
 @RestController
 
 
 @RestController
@@ -29,8 +31,8 @@ import java.util.Map;
 {
   public final static String PATH = "/transfers";
 
 {
   public final static String PATH = "/transfers";
 
-  private final ReceiveTransferUseCase receiveTransferUseCase;
   private final GetTransferUseCase getTransferUseCase;
   private final GetTransferUseCase getTransferUseCase;
+  private final MessagingService messagingService;
 
 
   @PostMapping(
 
 
   @PostMapping(
@@ -41,36 +43,48 @@ import java.util.Map;
       HttpServletRequest request,
       @Valid @RequestBody TransferDTO transferDTO)
   {
       HttpServletRequest request,
       @Valid @RequestBody TransferDTO transferDTO)
   {
-    Transfer transfer =
-        Transfer
-            .builder()
-            .id(transferDTO.getId())
-            .payer(transferDTO.getPayer())
-            .payee(transferDTO.getPayee())
-            .amount(transferDTO.getAmount())
-            .build();
-
     DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
 
     DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
 
-    receiveTransferUseCase
-        .receive(transfer)
-        .thenApply(
-            $ ->
+    getTransferUseCase
+        .get(transferDTO.getId())
+        .map(transfer ->
+            CompletableFuture.completedFuture(
                 ResponseEntity
                 ResponseEntity
-                    .created(URI.create(PATH + "/" + transferDTO.getId()))
-                    .build())
-        .thenAccept(
-            responseEntity -> result.setResult(responseEntity))
-        .exceptionally(
-            e ->
-            {
-              result.setErrorResult(e);
-              return null;
-            });
+                    .ok()
+                    .location(location(transferDTO))
+                    .build()))
+        .or(() ->
+            Optional.of(
+                messagingService
+                    .send(
+                        Transfer
+                            .builder()
+                            .id(transferDTO.getId())
+                            .payer(transferDTO.getPayer())
+                            .payee(transferDTO.getPayee())
+                            .amount(transferDTO.getAmount())
+                            .state(Transfer.State.RECEIVED)
+                            .build())
+                    .thenApply($ ->
+                        ResponseEntity
+                            .created(location(transferDTO))
+                            .build())))
+        .get()
+        .thenAccept(responseEntity -> result.setResult(responseEntity))
+        .exceptionally(e ->
+        {
+          result.setErrorResult(e);
+          return null;
+        });
 
     return result;
   }
 
 
     return result;
   }
 
+  private URI location(TransferDTO transferDTO)
+  {
+    return URI.create(PATH + "/" + transferDTO.getId());
+  }
+
   @GetMapping(
       path = PATH + "/{id}",
       produces = MediaType.APPLICATION_JSON_VALUE)
   @GetMapping(
       path = PATH + "/{id}",
       produces = MediaType.APPLICATION_JSON_VALUE)
index 1119e72..ff50377 100644 (file)
@@ -31,19 +31,6 @@ public class TransferDTO
   private Transfer.State state;
 
 
   private Transfer.State state;
 
 
-  public Transfer toTransfer()
-  {
-    return
-        Transfer
-            .builder()
-            .id(id)
-            .payer(payer)
-            .payee(payee)
-            .amount(amount)
-            .build();
-  }
-
-
   public static TransferDTO of(Transfer transfer)
   {
     return
   public static TransferDTO of(Transfer transfer)
   {
     return
index 82891b7..cc207d9 100644 (file)
@@ -17,6 +17,7 @@ public class Transfer
   public enum State
   {
     RECEIVED(false),
   public enum State
   {
     RECEIVED(false),
+    CREATED(false),
     INVALID(false),
     CHECKED(false),
     APPROVED(true),
     INVALID(false),
     CHECKED(false),
     APPROVED(true),
index 0cbcd2c..90ef682 100644 (file)
@@ -1,29 +1,38 @@
 package de.juplo.kafka.payment.transfer.domain;
 
 
 package de.juplo.kafka.payment.transfer.domain;
 
 
-import de.juplo.kafka.payment.transfer.ports.*;
+import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.MessagingService;
+import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
 
 import java.util.Optional;
 
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
 
 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
 
 
 @Slf4j
 @RequiredArgsConstructor
 
 
 @Slf4j
 @RequiredArgsConstructor
-public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase
+public class TransferService implements HandleTransferUseCase, GetTransferUseCase
 {
   private final TransferRepository repository;
   private final MessagingService messagingService;
 
 {
   private final TransferRepository repository;
   private final MessagingService messagingService;
 
-  public CompletableFuture<TopicPartition> receive(Transfer transfer)
+  private void create(Transfer transfer)
   {
   {
-    transfer.setState(RECEIVED);
-    return messagingService.send(transfer);
+    repository
+        .get(transfer.getId())
+        .ifPresentOrElse(
+            stored -> log.info("transfer already exisits: {}, ignoring: {}", stored, transfer),
+            () ->
+            {
+              repository.store(transfer);
+              transfer.setState(CREATED);
+              messagingService.send(transfer);
+            });
   }
 
   @Override
   }
 
   @Override
@@ -33,6 +42,11 @@ public class TransferService implements ReceiveTransferUseCase, HandleTransferUs
     switch (state)
     {
       case RECEIVED:
     switch (state)
     {
       case RECEIVED:
+        repository.store(transfer);
+        create(transfer);
+        break;
+
+      case CREATED:
         repository.store(transfer);
         check(transfer);
         break;
         repository.store(transfer);
         check(transfer);
         break;
@@ -49,21 +63,9 @@ public class TransferService implements ReceiveTransferUseCase, HandleTransferUs
 
   private void check(Transfer transfer)
   {
 
   private void check(Transfer transfer)
   {
-    repository
-        .get(transfer.getId())
-        .ifPresentOrElse(
-            stored ->
-            {
-              if (!transfer.equals(stored))
-                log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer);
-            },
-            () ->
-            {
-              repository.store(transfer);
-              // TODO: Do some time consuming checks...
-              transfer.setState(CHECKED);
-              messagingService.send(transfer);
-            });
+    // TODO: Do some time consuming checks...
+    transfer.setState(CHECKED);
+    messagingService.send(transfer);
   }
 
   public Optional<Transfer> get(Long id)
   }
 
   public Optional<Transfer> get(Long id)
index 81e7555..4037a90 100644 (file)
@@ -7,5 +7,5 @@ import java.util.concurrent.CompletableFuture;
 
 public interface MessagingService
 {
 
 public interface MessagingService
 {
-  CompletableFuture send(Transfer transfer);
+  CompletableFuture<?> send(Transfer transfer);
 }
 }
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java
deleted file mode 100644 (file)
index f892fb3..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-package de.juplo.kafka.payment.transfer.ports;
-
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.concurrent.CompletableFuture;
-
-
-public interface ReceiveTransferUseCase
-{
-  CompletableFuture<TopicPartition> receive(Transfer transfer);
-}