From: Kai Moritz Date: Sat, 12 Jun 2021 08:47:29 +0000 (+0200) Subject: WIP X-Git-Tag: wip-initialer-commit~9 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e99b64cc787c34c3fec438cf67434b8ea0d8cd43;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer WIP --- diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index cc3e056..e439068 100644 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -8,7 +8,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.UUIDSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -24,11 +23,11 @@ import java.util.UUID; public class TransferServiceApplication { @Bean(destroyMethod = "close") - KafkaProducer producer(TransferServiceProperties properties) + KafkaProducer producer(TransferServiceProperties properties) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new KafkaProducer<>(props); diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java index 2aa87fe..878a233 100644 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java @@ -2,16 +2,18 @@ package de.juplo.kafka.payment.transfer.domain; import lombok.Builder; -import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Value; -@Data +@Value @Builder +@EqualsAndHashCode(exclude = "state") public class Transfer { public enum State { - CREATED, + SENT, FAILED, PENDING, APPROVED, diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java index 908caa1..36d027c 100644 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java @@ -1,13 +1,15 @@ package de.juplo.kafka.payment.transfer.domain; -import de.juplo.kafka.payment.transfer.domain.Transfer; - import java.util.Optional; -import java.util.UUID; public interface TransferRepository { - void put(Transfer transfer); + void store(Transfer transfer); + Optional get(Long id); + + void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException; + + void remove(Long id); } diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java index 6545e29..f5d41cb 100644 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -1,6 +1,7 @@ package de.juplo.kafka.payment.transfer.domain; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -11,13 +12,15 @@ import org.springframework.http.ResponseEntity; import java.net.URI; import java.util.UUID; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; + @Slf4j @RequiredArgsConstructor public class TransferService { private final TransferRepository repository; - private final KafkaProducer producer; + private final KafkaProducer producer; private final ObjectMapper mapper; private final String topic; @@ -28,45 +31,55 @@ public class TransferService .ifPresentOrElse( stored -> { - switch (stored.getState()) + if (!transfer.equals(stored)) + throw new IllegalArgumentException( + "Re-Initiation of transfer with different data: old=" + + stored + + ", new=" + + transfer); + + if (stored.getState() == FAILED) { - case FAILED: + repository.update(transfer.getId(), FAILED, SENT); + log.info("Resending faild transfer: " + stored); + send(transfer); } }, () -> { + repository.store(transfer); + send(transfer); }); } - private void create(Transfer transfer) + private void send(Transfer transfer) { try { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( topic, - transfer.getId(), + Long.toString(transfer.getId()), mapper.writeValueAsString(transfer)); producer.send(record, (metadata, exception) -> { - if (exception != null) + if (metadata != null) { - log.error("Could not place order {}: {}", transfer, exception.toString()); - result.setErrorResult(exception); - return; + log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset()); + repository.update(transfer.getId(), SENT, PENDING); + } + else + { + log.error("Could not send {}: {}", transfer, exception.getMessage()); + repository.update(transfer.getId(), SENT, FAILED); } - - result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build()); }); } - catch (Exception e) + catch (JsonProcessingException e) { - log.error("Unexpected exception!", e); - result.setErrorResult(e); + throw new RuntimeException("Could not convert " + transfer, e); } - - return result; } }