import de.juplo.kafka.payment.transfer.ports.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
@Slf4j
@RequiredArgsConstructor
-public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase
+public class TransferService implements CreateTransferUseCase, HandleStateChangeUseCase, GetTransferUseCase
{
private final TransferRepository repository;
private final MessagingService messagingService;
- public CompletableFuture<TopicPartition> receive(Transfer transfer)
+ @Override
+ public void create(Transfer transfer)
{
- transfer.setState(RECEIVED);
- return messagingService.send(transfer);
+ repository
+ .get(transfer.getId())
+ .ifPresentOrElse(
+ stored -> log.info("transfer already exisits: {}, ignoring: {}", stored, transfer),
+ () ->
+ {
+ log.info("creating transfer: {}", transfer);
+ repository.store(transfer);
+ messagingService.send(transfer.getId(), CREATED);
+ });
}
@Override
Transfer.State state = transfer.getState();
switch (state)
{
- case RECEIVED:
+ case CREATED:
repository.store(transfer);
check(transfer);
break;
private void check(Transfer transfer)
{
- repository
- .get(transfer.getId())
- .ifPresentOrElse(
- stored ->
- {
- if (!transfer.equals(stored))
- log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer);
- },
- () ->
- {
- repository.store(transfer);
- // TODO: Do some time consuming checks...
- transfer.setState(CHECKED);
- messagingService.send(transfer);
- });
+ // TODO: Do some time consuming checks...
+ messagingService.send(transfer.getId(), CHECKED);
}
public Optional<Transfer> get(Long id)