+server:
+ port: 8091
juplo:
transfer:
group-instance-id: peter
+ instance-id-uri-mapping:
+ peter: http://localhost:8091
+ ute: http://localhost:8092
+ franz: http://localhost:8093
+ beate: http://localhost:8094
+ klaus: http://localhost:8095
state-store-interval: 15
local-state-store-path: state.bin
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-webflux</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
import org.springframework.context.annotation.Bean;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
+import org.springframework.web.reactive.function.client.WebClient;
import java.io.File;
import java.io.IOException;
KafkaMessagingService kafkaMessagingService,
TransferConsumer transferConsumer)
{
- return new TransferController(productionTransferService, kafkaMessagingService, transferConsumer);
+ return new TransferController(
+ productionTransferService,
+ kafkaMessagingService,
+ transferConsumer,
+ WebClient.create());
}
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
}
+ /**
+ * Identifies the URI, at which the Group-Instance can be reached,
+ * that holds the state for a specific {@link Transfer}.
+ *
+ * The {@link Transfer#getId() ID} of the {@link Transfer} is named
+ * {@code key} here and of type {@code String}, because this example
+ * project stores the key as a String in Kafka to simplify the listing
+ * and manual manipulation of the according topic.
+ *
+ * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
+ * @return An {@link Optional}, that holds the URI at which the Group-Instance
+ * can be reached, that holds the state for the {@link Transfer}, that
+ * is identified by the key (if present), or is empty, if the {@link Transfer}
+ * would be handled by the local instance.
+ */
public Optional<String> uriForKey(String key)
{
synchronized (this)
import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
import javax.servlet.http.HttpServletRequest;
import javax.validation.Valid;
private final GetTransferUseCase getTransferUseCase;
private final MessagingService messagingService;
private final TransferConsumer consumer;
+ private final WebClient webClient;
@PostMapping(
path = "",
consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE)
- public DeferredResult<ResponseEntity<?>> transfer(
- HttpServletRequest request,
- @Valid @RequestBody TransferDTO transferDTO)
+ public DeferredResult<ResponseEntity<?>> transfer(@Valid @RequestBody TransferDTO transferDTO)
{
DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
- getTransferUseCase
- .get(transferDTO.getId())
- .map(transfer ->
+ Long id = transferDTO.getId();
+
+ consumer
+ .uriForKey(id.toString())
+ .map(uri ->
+ webClient.get()
+ .uri(uri + PATH + "/" + id)
+ .accept(MediaType.APPLICATION_JSON)
+ .retrieve()
+ .onStatus(status -> true, bar -> Mono.empty())
+ .toBodilessEntity()
+ .blockOptional()
+ .flatMap(resp ->
+ resp.getStatusCode().is2xxSuccessful()
+ ? Optional.of(Boolean.TRUE)
+ : Optional.<Boolean>empty()))
+ .or(() -> Optional.of(getTransferUseCase.get(transferDTO.getId()).map(transfer -> Boolean.TRUE)))
+ .flatMap(optional -> optional)
+ .map($ ->
CompletableFuture.completedFuture(
ResponseEntity
.ok()