package de.juplo.kafka.payment.transfer.domain;
-import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.MessagingService;
-import de.juplo.kafka.payment.transfer.ports.TransferRepository;
+import de.juplo.kafka.payment.transfer.ports.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
@Slf4j
@RequiredArgsConstructor
-public class TransferService implements InitiateTransferUseCase, GetTransferUseCase
+public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase
{
private final TransferRepository repository;
private final MessagingService messagingService;
- public synchronized void initiate(Transfer transfer)
+ public CompletableFuture<TopicPartition> receive(Transfer transfer)
+ {
+ transfer.setState(RECEIVED);
+ return messagingService.send(transfer);
+ }
+
+ @Override
+ public void handle(Transfer transfer)
+ {
+ Transfer.State state = transfer.getState();
+ switch (state)
+ {
+ case RECEIVED:
+ repository.store(transfer);
+ check(transfer);
+ break;
+
+ case CHECKED:
+ repository.store(transfer);
+ // TODO: What's next...?
+ break;
+
+ default:
+ log.warn("TODO: handle {} state {}", state.foreign ? "foreign" : "domain", state);
+ }
+ }
+
+ private void check(Transfer transfer)
{
repository
.get(transfer.getId())
stored ->
{
if (!transfer.equals(stored))
- throw new IllegalArgumentException(
- "Re-Initiation of transfer with different data: old=" +
- stored +
- ", new=" +
- transfer);
-
- if (stored.getState() == FAILED)
- {
- repository.update(transfer.getId(), FAILED, SENT);
- log.info("Resending faild transfer: " + stored);
- send(transfer);
- }
+ log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer);
},
() ->
{
- send(transfer);
- transfer.setState(SENT);
repository.store(transfer);
- });
- }
-
- private void send(Transfer transfer)
- {
- messagingService
- .send(transfer)
- .thenApply(
- $ ->
- {
- repository.update(transfer.getId(), SENT, PENDING);
- return null;
- })
- .exceptionally(
- e ->
- {
- repository.update(transfer.getId(), SENT, FAILED);
- return null;
+ // TODO: Do some time consuming checks...
+ transfer.setState(CHECKED);
+ messagingService.send(transfer);
});
}