From: Kai Moritz Date: Fri, 25 Jun 2021 14:15:53 +0000 (+0200) Subject: Bugfix: Check for existence of a new transfer requires a remote-call X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=commitdiff_plain;h=19aec49a7f3a46f55e696a5a930c48883c4f1cd2;ds=sidebyside Bugfix: Check for existence of a new transfer requires a remote-call * The _local_ check for the existens of a transfer to create possibly leads to a NPE in this version, because it access the TransferRepository for a partition, not regarding, if it is available locally. * This access simply lead to no result before, but since the in-memory maps for the partitions are now created only, when the partition is assigned, it causes a NPE now. * The local check, that does not make a lot of sence since the service was refactored to run on multiple insances in parallel, is replaced against a remote call here. --- diff --git a/application.yml b/application.yml index daac570..eb412ec 100644 --- a/application.yml +++ b/application.yml @@ -1,5 +1,13 @@ +server: + port: 8091 juplo: transfer: group-instance-id: peter + instance-id-uri-mapping: + peter: http://localhost:8091 + ute: http://localhost:8092 + franz: http://localhost:8093 + beate: http://localhost:8094 + klaus: http://localhost:8095 state-store-interval: 15 local-state-store-path: state.bin diff --git a/pom.xml b/pom.xml index d094912..f536f3d 100644 --- a/pom.xml +++ b/pom.xml @@ -35,6 +35,10 @@ org.springframework.boot spring-boot-starter-web + + org.springframework.boot + spring-boot-starter-webflux + org.springframework.boot spring-boot-starter-validation diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 5114a1c..a5c1faf 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -24,6 +24,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import org.springframework.web.reactive.function.client.WebClient; import java.io.File; import java.io.IOException; @@ -235,7 +236,11 @@ public class TransferServiceApplication KafkaMessagingService kafkaMessagingService, TransferConsumer transferConsumer) { - return new TransferController(productionTransferService, kafkaMessagingService, transferConsumer); + return new TransferController( + productionTransferService, + kafkaMessagingService, + transferConsumer, + WebClient.create()); } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index aa00737..920d79a 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -2,6 +2,7 @@ package de.juplo.kafka.payment.transfer.adapter; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; @@ -191,6 +192,21 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener } + /** + * Identifies the URI, at which the Group-Instance can be reached, + * that holds the state for a specific {@link Transfer}. + * + * The {@link Transfer#getId() ID} of the {@link Transfer} is named + * {@code key} here and of type {@code String}, because this example + * project stores the key as a String in Kafka to simplify the listing + * and manual manipulation of the according topic. + * + * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}. + * @return An {@link Optional}, that holds the URI at which the Group-Instance + * can be reached, that holds the state for the {@link Transfer}, that + * is identified by the key (if present), or is empty, if the {@link Transfer} + * would be handled by the local instance. + */ public Optional uriForKey(String key) { synchronized (this) diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java index d81c554..e60ba94 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java @@ -13,6 +13,8 @@ import org.springframework.validation.FieldError; import org.springframework.web.bind.MethodArgumentNotValidException; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; import javax.servlet.http.HttpServletRequest; import javax.validation.Valid; @@ -35,21 +37,36 @@ import java.util.concurrent.CompletableFuture; private final GetTransferUseCase getTransferUseCase; private final MessagingService messagingService; private final TransferConsumer consumer; + private final WebClient webClient; @PostMapping( path = "", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) - public DeferredResult> transfer( - HttpServletRequest request, - @Valid @RequestBody TransferDTO transferDTO) + public DeferredResult> transfer(@Valid @RequestBody TransferDTO transferDTO) { DeferredResult> result = new DeferredResult<>(); - getTransferUseCase - .get(transferDTO.getId()) - .map(transfer -> + Long id = transferDTO.getId(); + + consumer + .uriForKey(id.toString()) + .map(uri -> + webClient.get() + .uri(uri + PATH + "/" + id) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .onStatus(status -> true, bar -> Mono.empty()) + .toBodilessEntity() + .blockOptional() + .flatMap(resp -> + resp.getStatusCode().is2xxSuccessful() + ? Optional.of(Boolean.TRUE) + : Optional.empty())) + .or(() -> Optional.of(getTransferUseCase.get(transferDTO.getId()).map(transfer -> Boolean.TRUE))) + .flatMap(optional -> optional) + .map($ -> CompletableFuture.completedFuture( ResponseEntity .ok()