Switched from single-node (assign) to multi-instance (subscribe)
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferController.java
index e20f9bf..d81c554 100644 (file)
@@ -3,7 +3,7 @@ package de.juplo.kafka.payment.transfer.adapter;
 
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.MessagingService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.HttpStatus;
@@ -12,6 +12,7 @@ import org.springframework.http.ResponseEntity;
 import org.springframework.validation.FieldError;
 import org.springframework.web.bind.MethodArgumentNotValidException;
 import org.springframework.web.bind.annotation.*;
+import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.validation.Valid;
@@ -19,49 +20,94 @@ import java.net.URI;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 
-@RestController
+@RequestMapping(TransferController.PATH)
+@ResponseBody
 @RequiredArgsConstructor
 @Slf4j
  public class TransferController
 {
   public final static String PATH = "/transfers";
 
-  private final InitiateTransferUseCase initiateTransferUseCase;
   private final GetTransferUseCase getTransferUseCase;
+  private final MessagingService messagingService;
+  private final TransferConsumer consumer;
 
 
   @PostMapping(
-      path = PATH,
+      path = "",
       consumes = MediaType.APPLICATION_JSON_VALUE,
       produces = MediaType.APPLICATION_JSON_VALUE)
-  public ResponseEntity<?> transfer(@Valid @RequestBody TransferDTO transferDTO)
+  public DeferredResult<ResponseEntity<?>> transfer(
+      HttpServletRequest request,
+      @Valid @RequestBody TransferDTO transferDTO)
   {
-    Transfer transfer =
-        Transfer
-            .builder()
-            .id(transferDTO.getId())
-            .payer(transferDTO.getPayer())
-            .payee(transferDTO.getPayee())
-            .amount(transferDTO.getAmount())
-            .build();
+    DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
+
+    getTransferUseCase
+        .get(transferDTO.getId())
+        .map(transfer ->
+            CompletableFuture.completedFuture(
+                ResponseEntity
+                    .ok()
+                    .location(location(transferDTO))
+                    .build()))
+        .or(() ->
+            Optional.of(
+                messagingService
+                    .send(
+                        Transfer
+                            .builder()
+                            .id(transferDTO.getId())
+                            .payer(transferDTO.getPayer())
+                            .payee(transferDTO.getPayee())
+                            .amount(transferDTO.getAmount())
+                            .build())
+                    .thenApply($ ->
+                        ResponseEntity
+                            .created(location(transferDTO))
+                            .build())))
+        .get()
+        .thenAccept(responseEntity -> result.setResult(responseEntity))
+        .exceptionally(e ->
+        {
+          result.setErrorResult(e);
+          return null;
+        });
 
-    initiateTransferUseCase.initiate(transfer);
+    return result;
+  }
 
-    return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build();
+  private URI location(TransferDTO transferDTO)
+  {
+    return URI.create(PATH + "/" + transferDTO.getId());
   }
 
   @GetMapping(
-      path = PATH + "/{id}",
+      path = "/{id}",
       produces = MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<TransferDTO> get(@PathVariable Long id)
   {
     return
-        getTransferUseCase
-            .get(id)
-            .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer)))
-            .orElse(ResponseEntity.notFound().build());
+        consumer
+            .uriForKey(Long.toString(id))
+            .map(uri ->
+            {
+              ResponseEntity<TransferDTO> response =
+                  ResponseEntity
+                      .status(HttpStatus.TEMPORARY_REDIRECT)
+                      .location(URI.create(uri + PATH + "/" + id))
+                      .build();
+              return response;
+            })
+            .orElseGet(() ->
+                getTransferUseCase
+                    .get(id)
+                    .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer)))
+                    .orElse(ResponseEntity.notFound().build()));
   }
 
   @ResponseStatus(HttpStatus.BAD_REQUEST)