X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=transfer%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fdomain%2FTransferService.java;h=d70882640c934f5d7ea8122441ff3a05c6bbe251;hb=abea9b5c5ff90b2db1ab7bd577d644c05a4dd37f;hp=6545e299cb7bae2ead945e0f888687855cca3d46;hpb=c3d8ff110b30df7013d9b9c8440c3591766ba072;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java index 6545e29..d708826 100644 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -1,15 +1,16 @@ package de.juplo.kafka.payment.transfer.domain; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.http.ResponseEntity; -import java.net.URI; -import java.util.UUID; +import java.util.Optional; + +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; @Slf4j @@ -17,7 +18,7 @@ import java.util.UUID; public class TransferService { private final TransferRepository repository; - private final KafkaProducer producer; + private final KafkaProducer producer; private final ObjectMapper mapper; private final String topic; @@ -28,45 +29,61 @@ public class TransferService .ifPresentOrElse( stored -> { - switch (stored.getState()) + if (!transfer.equals(stored)) + throw new IllegalArgumentException( + "Re-Initiation of transfer with different data: old=" + + stored + + ", new=" + + transfer); + + if (stored.getState() == FAILED) { - case FAILED: + repository.update(transfer.getId(), FAILED, SENT); + log.info("Resending faild transfer: " + stored); + send(transfer); } }, () -> { + send(transfer); + transfer.setState(SENT); + repository.store(transfer); }); } - private void create(Transfer transfer) + private void send(Transfer transfer) { try { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( topic, - transfer.getId(), + Long.toString(transfer.getId()), mapper.writeValueAsString(transfer)); producer.send(record, (metadata, exception) -> { - if (exception != null) + if (metadata != null) { - log.error("Could not place order {}: {}", transfer, exception.toString()); - result.setErrorResult(exception); - return; + log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset()); + repository.update(transfer.getId(), SENT, PENDING); + } + else + { + log.error("Could not send {}: {}", transfer, exception.getMessage()); + repository.update(transfer.getId(), SENT, FAILED); } - - result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build()); }); } - catch (Exception e) + catch (JsonProcessingException e) { - log.error("Unexpected exception!", e); - result.setErrorResult(e); + throw new RuntimeException("Could not convert " + transfer, e); } + } - return result; + public Optional get(Long id) + { + return repository.get(id); } }