import de.juplo.kafka.payment.transfer.domain.Transfer;
import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.ReceiveTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.MessagingService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
@RestController
{
public final static String PATH = "/transfers";
- private final ReceiveTransferUseCase receiveTransferUseCase;
private final GetTransferUseCase getTransferUseCase;
+ private final MessagingService messagingService;
@PostMapping(
HttpServletRequest request,
@Valid @RequestBody TransferDTO transferDTO)
{
- Transfer transfer =
- Transfer
- .builder()
- .id(transferDTO.getId())
- .payer(transferDTO.getPayer())
- .payee(transferDTO.getPayee())
- .amount(transferDTO.getAmount())
- .build();
-
DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
- receiveTransferUseCase
- .receive(transfer)
- .thenApply(
- $ ->
+ getTransferUseCase
+ .get(transferDTO.getId())
+ .map(transfer ->
+ CompletableFuture.completedFuture(
ResponseEntity
- .created(URI.create(PATH + "/" + transferDTO.getId()))
- .build())
- .thenAccept(
- responseEntity -> result.setResult(responseEntity))
- .exceptionally(
- e ->
- {
- result.setErrorResult(e);
- return null;
- });
+ .ok()
+ .location(location(transferDTO))
+ .build()))
+ .or(() ->
+ Optional.of(
+ messagingService
+ .send(
+ Transfer
+ .builder()
+ .id(transferDTO.getId())
+ .payer(transferDTO.getPayer())
+ .payee(transferDTO.getPayee())
+ .amount(transferDTO.getAmount())
+ .state(Transfer.State.RECEIVED)
+ .build())
+ .thenApply($ ->
+ ResponseEntity
+ .created(location(transferDTO))
+ .build())))
+ .get()
+ .thenAccept(responseEntity -> result.setResult(responseEntity))
+ .exceptionally(e ->
+ {
+ result.setErrorResult(e);
+ return null;
+ });
return result;
}
+ private URI location(TransferDTO transferDTO)
+ {
+ return URI.create(PATH + "/" + transferDTO.getId());
+ }
+
@GetMapping(
path = PATH + "/{id}",
produces = MediaType.APPLICATION_JSON_VALUE)
package de.juplo.kafka.payment.transfer.domain;
-import de.juplo.kafka.payment.transfer.ports.*;
+import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.MessagingService;
+import de.juplo.kafka.payment.transfer.ports.TransferRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
@Slf4j
@RequiredArgsConstructor
-public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase
+public class TransferService implements HandleTransferUseCase, GetTransferUseCase
{
private final TransferRepository repository;
private final MessagingService messagingService;
- public CompletableFuture<TopicPartition> receive(Transfer transfer)
+ private void create(Transfer transfer)
{
- transfer.setState(RECEIVED);
- return messagingService.send(transfer);
+ repository
+ .get(transfer.getId())
+ .ifPresentOrElse(
+ stored -> log.info("transfer already exisits: {}, ignoring: {}", stored, transfer),
+ () ->
+ {
+ repository.store(transfer);
+ transfer.setState(CREATED);
+ messagingService.send(transfer);
+ });
}
@Override
switch (state)
{
case RECEIVED:
+ repository.store(transfer);
+ create(transfer);
+ break;
+
+ case CREATED:
repository.store(transfer);
check(transfer);
break;
private void check(Transfer transfer)
{
- repository
- .get(transfer.getId())
- .ifPresentOrElse(
- stored ->
- {
- if (!transfer.equals(stored))
- log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer);
- },
- () ->
- {
- repository.store(transfer);
- // TODO: Do some time consuming checks...
- transfer.setState(CHECKED);
- messagingService.send(transfer);
- });
+ // TODO: Do some time consuming checks...
+ transfer.setState(CHECKED);
+ messagingService.send(transfer);
}
public Optional<Transfer> get(Long id)