X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=transfer%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fdomain%2FTransferService.java;h=f5d41cb0f2031c9207190d3cbd5b9a133df763ff;hb=e99b64cc787c34c3fec438cf67434b8ea0d8cd43;hp=6545e299cb7bae2ead945e0f888687855cca3d46;hpb=c3d8ff110b30df7013d9b9c8440c3591766ba072;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java index 6545e29..f5d41cb 100644 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -1,6 +1,7 @@ package de.juplo.kafka.payment.transfer.domain; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -11,13 +12,15 @@ import org.springframework.http.ResponseEntity; import java.net.URI; import java.util.UUID; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; + @Slf4j @RequiredArgsConstructor public class TransferService { private final TransferRepository repository; - private final KafkaProducer producer; + private final KafkaProducer producer; private final ObjectMapper mapper; private final String topic; @@ -28,45 +31,55 @@ public class TransferService .ifPresentOrElse( stored -> { - switch (stored.getState()) + if (!transfer.equals(stored)) + throw new IllegalArgumentException( + "Re-Initiation of transfer with different data: old=" + + stored + + ", new=" + + transfer); + + if (stored.getState() == FAILED) { - case FAILED: + repository.update(transfer.getId(), FAILED, SENT); + log.info("Resending faild transfer: " + stored); + send(transfer); } }, () -> { + repository.store(transfer); + send(transfer); }); } - private void create(Transfer transfer) + private void send(Transfer transfer) { try { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( topic, - transfer.getId(), + Long.toString(transfer.getId()), mapper.writeValueAsString(transfer)); producer.send(record, (metadata, exception) -> { - if (exception != null) + if (metadata != null) { - log.error("Could not place order {}: {}", transfer, exception.toString()); - result.setErrorResult(exception); - return; + log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset()); + repository.update(transfer.getId(), SENT, PENDING); + } + else + { + log.error("Could not send {}: {}", transfer, exception.getMessage()); + repository.update(transfer.getId(), SENT, FAILED); } - - result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build()); }); } - catch (Exception e) + catch (JsonProcessingException e) { - log.error("Unexpected exception!", e); - result.setErrorResult(e); + throw new RuntimeException("Could not convert " + transfer, e); } - - return result; } }