From: Kai Moritz Date: Fri, 11 Jun 2021 15:38:21 +0000 (+0200) Subject: WIP X-Git-Tag: wip-initialer-commit~12 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=aa415473c0fced1fc60b876f4afa1909c34c5926;p=demos%2Fkafka%2Fdemos-kafka-payment-system-setup WIP --- diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/Transfer.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/Transfer.java new file mode 100644 index 0000000..58ca15a --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/Transfer.java @@ -0,0 +1,23 @@ +package de.juplo.kafka.payment.transfer.api; + +import lombok.Data; + +import javax.validation.constraints.NotNull; +import java.util.UUID; + + +/** + * Simple DTO used by the REST interface + */ +@Data +public class Transfer +{ + @NotNull(message = "Cannot be null") + private UUID id; + @NotNull(message = "Cannot be null") + private long payer; + @NotNull(message = "Cannot be null") + private long payee; + @NotNull(message = "Cannot be null") + private int amount; +} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/TransferService.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/TransferService.java new file mode 100644 index 0000000..e4bb4f8 --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/TransferService.java @@ -0,0 +1,6 @@ +package de.juplo.kafka.payment.transfer.api; + +public interface TransferService +{ + void handleTransfer(Transfer transfer); +} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java new file mode 100644 index 0000000..c93fb46 --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java @@ -0,0 +1,37 @@ +package de.juplo.kafka.payment.transfer.controller; + + +import de.juplo.kafka.payment.transfer.api.Transfer; +import de.juplo.kafka.payment.transfer.impl.TransferServiceImpl; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import javax.validation.Valid; +import java.net.URI; + + +@RestController +@RequiredArgsConstructor +@Slf4j +public class TransferController +{ + public final static String PATH = "/transfers"; + + private final TransferServiceImpl service; + + + @PostMapping( + path = PATH, + consumes = MediaType.APPLICATION_JSON_VALUE, + produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity transfer(@Valid @RequestBody Transfer transfer) + { + service.initiate(transfer); + return ResponseEntity.created(URI.create(PATH + transfer.getId())).build(); + } +} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/impl/TransferServiceImpl.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/impl/TransferServiceImpl.java new file mode 100644 index 0000000..9a24b2b --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/impl/TransferServiceImpl.java @@ -0,0 +1,73 @@ +package de.juplo.kafka.payment.transfer.impl; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.api.Transfer; +import de.juplo.kafka.payment.transfer.api.TransferService; +import de.juplo.kafka.payment.transfer.persistence.TransferRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.context.request.async.DeferredResult; + +import javax.validation.Valid; +import java.net.URI; +import java.util.UUID; + + +@Slf4j +@RequiredArgsConstructor +public class TransferServiceImpl implements TransferService +{ + private final TransferRepository repository; + private final KafkaProducer producer; + private final ObjectMapper mapper; + private final String topic; + + + @Override + public void initiate(Transfer transfer) + { + repository + .get(transfer.getId()) + .ifPresentOrElse( + stored -> + { + + }, + () -> + { + + }); + try + { + ProducerRecord record = + new ProducerRecord<>( + topic, + transfer.getId(), + mapper.writeValueAsString(transfer)); + + producer.send(record, (metadata, exception) -> + { + if (exception != null) + { + log.error("Could not place order {}: {}", transfer, exception.toString()); + result.setErrorResult(exception); + return; + } + + result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build()); + }); + } + catch (Exception e) + { + log.error("Unexpected exception!", e); + result.setErrorResult(e); + } + + return result; + } +} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/TransferRepository.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/TransferRepository.java new file mode 100644 index 0000000..5b3d8b4 --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/TransferRepository.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.payment.transfer.persistence; + +import de.juplo.kafka.payment.transfer.api.Transfer; + +import java.util.Optional; +import java.util.UUID; + + +public interface TransferRepository +{ + void put(Transfer transfer); + Optional get(UUID uuid); +}