From aa415473c0fced1fc60b876f4afa1909c34c5926 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 11 Jun 2021 17:38:21 +0200 Subject: [PATCH] WIP --- .../kafka/payment/transfer/api/Transfer.java | 23 ++++++ .../payment/transfer/api/TransferService.java | 6 ++ .../controller/TransferController.java | 37 ++++++++++ .../transfer/impl/TransferServiceImpl.java | 73 +++++++++++++++++++ .../persistence/TransferRepository.java | 13 ++++ 5 files changed, 152 insertions(+) create mode 100644 transfer/src/main/java/de/juplo/kafka/payment/transfer/api/Transfer.java create mode 100644 transfer/src/main/java/de/juplo/kafka/payment/transfer/api/TransferService.java create mode 100644 transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java create mode 100644 transfer/src/main/java/de/juplo/kafka/payment/transfer/impl/TransferServiceImpl.java create mode 100644 transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/TransferRepository.java diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/Transfer.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/Transfer.java new file mode 100644 index 0000000..58ca15a --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/Transfer.java @@ -0,0 +1,23 @@ +package de.juplo.kafka.payment.transfer.api; + +import lombok.Data; + +import javax.validation.constraints.NotNull; +import java.util.UUID; + + +/** + * Simple DTO used by the REST interface + */ +@Data +public class Transfer +{ + @NotNull(message = "Cannot be null") + private UUID id; + @NotNull(message = "Cannot be null") + private long payer; + @NotNull(message = "Cannot be null") + private long payee; + @NotNull(message = "Cannot be null") + private int amount; +} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/TransferService.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/TransferService.java new file mode 100644 index 0000000..e4bb4f8 --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/TransferService.java @@ -0,0 +1,6 @@ +package de.juplo.kafka.payment.transfer.api; + +public interface TransferService +{ + void handleTransfer(Transfer transfer); +} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java new file mode 100644 index 0000000..c93fb46 --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java @@ -0,0 +1,37 @@ +package de.juplo.kafka.payment.transfer.controller; + + +import de.juplo.kafka.payment.transfer.api.Transfer; +import de.juplo.kafka.payment.transfer.impl.TransferServiceImpl; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import javax.validation.Valid; +import java.net.URI; + + +@RestController +@RequiredArgsConstructor +@Slf4j +public class TransferController +{ + public final static String PATH = "/transfers"; + + private final TransferServiceImpl service; + + + @PostMapping( + path = PATH, + consumes = MediaType.APPLICATION_JSON_VALUE, + produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity transfer(@Valid @RequestBody Transfer transfer) + { + service.initiate(transfer); + return ResponseEntity.created(URI.create(PATH + transfer.getId())).build(); + } +} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/impl/TransferServiceImpl.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/impl/TransferServiceImpl.java new file mode 100644 index 0000000..9a24b2b --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/impl/TransferServiceImpl.java @@ -0,0 +1,73 @@ +package de.juplo.kafka.payment.transfer.impl; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.api.Transfer; +import de.juplo.kafka.payment.transfer.api.TransferService; +import de.juplo.kafka.payment.transfer.persistence.TransferRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.context.request.async.DeferredResult; + +import javax.validation.Valid; +import java.net.URI; +import java.util.UUID; + + +@Slf4j +@RequiredArgsConstructor +public class TransferServiceImpl implements TransferService +{ + private final TransferRepository repository; + private final KafkaProducer producer; + private final ObjectMapper mapper; + private final String topic; + + + @Override + public void initiate(Transfer transfer) + { + repository + .get(transfer.getId()) + .ifPresentOrElse( + stored -> + { + + }, + () -> + { + + }); + try + { + ProducerRecord record = + new ProducerRecord<>( + topic, + transfer.getId(), + mapper.writeValueAsString(transfer)); + + producer.send(record, (metadata, exception) -> + { + if (exception != null) + { + log.error("Could not place order {}: {}", transfer, exception.toString()); + result.setErrorResult(exception); + return; + } + + result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build()); + }); + } + catch (Exception e) + { + log.error("Unexpected exception!", e); + result.setErrorResult(e); + } + + return result; + } +} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/TransferRepository.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/TransferRepository.java new file mode 100644 index 0000000..5b3d8b4 --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/TransferRepository.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.payment.transfer.persistence; + +import de.juplo.kafka.payment.transfer.api.Transfer; + +import java.util.Optional; +import java.util.UUID; + + +public interface TransferRepository +{ + void put(Transfer transfer); + Optional get(UUID uuid); +} -- 2.20.1