]> juplo.de Git - demos/kafka/demos-kafka-payment-system-transfer/commitdiff
Bugfix: Check for existence of a new transfer requires a remote-call
authorKai Moritz <kai@juplo.de>
Fri, 25 Jun 2021 14:15:53 +0000 (16:15 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 30 Jun 2021 16:56:18 +0000 (18:56 +0200)
* The _local_ check for the existens of a transfer to create possibly leads
  to a NPE in this version, because it access the TransferRepository for a
  partition, not regarding, if it is available locally.
* This access simply lead to no result before, but since the in-memory
  maps for the partitions are now created only, when the partition is
  assigned, it causes a NPE now.
* The local check, that does not make a lot of sence since the service
  was refactored to run on multiple insances in parallel, is replaced
  against a remote call here.

application.yml
pom.xml
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java

index daac5708bf0676fc0fd355a567ef5a49e1ea0f05..eb412ecd08066c802d8f03d43a75e3803c4d5b40 100644 (file)
@@ -1,5 +1,13 @@
+server:
+  port: 8091
 juplo:
   transfer:
     group-instance-id: peter
+    instance-id-uri-mapping:
+      peter: http://localhost:8091
+      ute:   http://localhost:8092
+      franz: http://localhost:8093
+      beate: http://localhost:8094
+      klaus: http://localhost:8095
     state-store-interval: 15
     local-state-store-path: state.bin
diff --git a/pom.xml b/pom.xml
index d094912614c611e909c46a47b9a683f2e06b2905..f536f3d1cf868b8de7d2cb8c92ef03efc3c00f1d 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-webflux</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-validation</artifactId>
index 5114a1c4fff0685bc016ec1c181f898a515f919c..a5c1faf323101462289a7250e2aa883d955fe531 100644 (file)
@@ -24,6 +24,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.util.Assert;
 import org.springframework.util.StringUtils;
+import org.springframework.web.reactive.function.client.WebClient;
 
 import java.io.File;
 import java.io.IOException;
@@ -235,7 +236,11 @@ public class TransferServiceApplication
       KafkaMessagingService kafkaMessagingService,
       TransferConsumer transferConsumer)
   {
-    return new TransferController(productionTransferService, kafkaMessagingService, transferConsumer);
+    return new TransferController(
+        productionTransferService,
+        kafkaMessagingService,
+        transferConsumer,
+        WebClient.create());
   }
 
 
index aa00737adc0717d8cf62de2bcb0ad47f5832968c..920d79ae480b6bafdc889839fda7002abffb9ec5 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.payment.transfer.adapter;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
@@ -191,6 +192,21 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   }
 
 
+  /**
+   * Identifies the URI, at which the Group-Instance can be reached,
+   * that holds the state for a specific {@link Transfer}.
+   *
+   * The {@link Transfer#getId() ID} of the {@link Transfer} is named
+   * {@code key} here and of type {@code String}, because this example
+   * project stores the key as a String in Kafka to simplify the listing
+   * and manual manipulation of the according topic.
+   *
+   * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
+   * @return An {@link Optional}, that holds the URI at which the Group-Instance
+   * can be reached, that holds the state for the {@link Transfer}, that
+   * is identified by the key (if present), or is empty, if the {@link Transfer}
+   * would be handled by the local instance.
+   */
   public Optional<String> uriForKey(String key)
   {
     synchronized (this)
index d81c554043cefbfabd464b2bb2bb56c92d08d08a..e60ba949da5bf7e3be4f6ae83ffffebfbfa8ba64 100644 (file)
@@ -13,6 +13,8 @@ import org.springframework.validation.FieldError;
 import org.springframework.web.bind.MethodArgumentNotValidException;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.validation.Valid;
@@ -35,21 +37,36 @@ import java.util.concurrent.CompletableFuture;
   private final GetTransferUseCase getTransferUseCase;
   private final MessagingService messagingService;
   private final TransferConsumer consumer;
+  private final WebClient webClient;
 
 
   @PostMapping(
       path = "",
       consumes = MediaType.APPLICATION_JSON_VALUE,
       produces = MediaType.APPLICATION_JSON_VALUE)
-  public DeferredResult<ResponseEntity<?>> transfer(
-      HttpServletRequest request,
-      @Valid @RequestBody TransferDTO transferDTO)
+  public DeferredResult<ResponseEntity<?>> transfer(@Valid @RequestBody TransferDTO transferDTO)
   {
     DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
 
-    getTransferUseCase
-        .get(transferDTO.getId())
-        .map(transfer ->
+    Long id = transferDTO.getId();
+
+    consumer
+        .uriForKey(id.toString())
+        .map(uri ->
+            webClient.get()
+                .uri(uri + PATH + "/" + id)
+                .accept(MediaType.APPLICATION_JSON)
+                .retrieve()
+                .onStatus(status -> true, bar -> Mono.empty())
+                .toBodilessEntity()
+                .blockOptional()
+                .flatMap(resp ->
+                    resp.getStatusCode().is2xxSuccessful()
+                        ? Optional.of(Boolean.TRUE)
+                        : Optional.<Boolean>empty()))
+        .or(() -> Optional.of(getTransferUseCase.get(transferDTO.getId()).map(transfer -> Boolean.TRUE)))
+        .flatMap(optional -> optional)
+        .map($ ->
             CompletableFuture.completedFuture(
                 ResponseEntity
                     .ok()