WIP
authorKai Moritz <kai@juplo.de>
Fri, 11 Jun 2021 15:38:21 +0000 (17:38 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 11 Jun 2021 15:38:21 +0000 (17:38 +0200)
transfer/src/main/java/de/juplo/kafka/payment/transfer/api/Transfer.java [new file with mode: 0644]
transfer/src/main/java/de/juplo/kafka/payment/transfer/api/TransferService.java [new file with mode: 0644]
transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java [new file with mode: 0644]
transfer/src/main/java/de/juplo/kafka/payment/transfer/impl/TransferServiceImpl.java [new file with mode: 0644]
transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/TransferRepository.java [new file with mode: 0644]

diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/Transfer.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/Transfer.java
new file mode 100644 (file)
index 0000000..58ca15a
--- /dev/null
@@ -0,0 +1,23 @@
+package de.juplo.kafka.payment.transfer.api;
+
+import lombok.Data;
+
+import javax.validation.constraints.NotNull;
+import java.util.UUID;
+
+
+/**
+ * Simple DTO used by the REST interface
+ */
+@Data
+public class Transfer
+{
+  @NotNull(message = "Cannot be null")
+  private UUID id;
+  @NotNull(message = "Cannot be null")
+  private long payer;
+  @NotNull(message = "Cannot be null")
+  private long payee;
+  @NotNull(message = "Cannot be null")
+  private int amount;
+}
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/TransferService.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/api/TransferService.java
new file mode 100644 (file)
index 0000000..e4bb4f8
--- /dev/null
@@ -0,0 +1,6 @@
+package de.juplo.kafka.payment.transfer.api;
+
+public interface TransferService
+{
+  void handleTransfer(Transfer transfer);
+}
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java
new file mode 100644 (file)
index 0000000..c93fb46
--- /dev/null
@@ -0,0 +1,37 @@
+package de.juplo.kafka.payment.transfer.controller;
+
+
+import de.juplo.kafka.payment.transfer.api.Transfer;
+import de.juplo.kafka.payment.transfer.impl.TransferServiceImpl;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.validation.Valid;
+import java.net.URI;
+
+
+@RestController
+@RequiredArgsConstructor
+@Slf4j
+public class TransferController
+{
+  public final static String PATH = "/transfers";
+
+  private final TransferServiceImpl service;
+
+
+  @PostMapping(
+      path = PATH,
+      consumes = MediaType.APPLICATION_JSON_VALUE,
+      produces = MediaType.APPLICATION_JSON_VALUE)
+  public ResponseEntity<?> transfer(@Valid @RequestBody Transfer transfer)
+  {
+    service.initiate(transfer);
+    return ResponseEntity.created(URI.create(PATH + transfer.getId())).build();
+  }
+}
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/impl/TransferServiceImpl.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/impl/TransferServiceImpl.java
new file mode 100644 (file)
index 0000000..9a24b2b
--- /dev/null
@@ -0,0 +1,73 @@
+package de.juplo.kafka.payment.transfer.impl;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.api.Transfer;
+import de.juplo.kafka.payment.transfer.api.TransferService;
+import de.juplo.kafka.payment.transfer.persistence.TransferRepository;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import javax.validation.Valid;
+import java.net.URI;
+import java.util.UUID;
+
+
+@Slf4j
+@RequiredArgsConstructor
+public class TransferServiceImpl implements TransferService
+{
+  private final TransferRepository repository;
+  private final KafkaProducer<UUID, String> producer;
+  private final ObjectMapper mapper;
+  private final String topic;
+
+
+  @Override
+  public void initiate(Transfer transfer)
+  {
+    repository
+        .get(transfer.getId())
+        .ifPresentOrElse(
+            stored ->
+            {
+
+            },
+            () ->
+            {
+
+            });
+    try
+    {
+      ProducerRecord<UUID, String> record =
+          new ProducerRecord<>(
+              topic,
+              transfer.getId(),
+              mapper.writeValueAsString(transfer));
+
+      producer.send(record, (metadata, exception) ->
+      {
+        if (exception != null)
+        {
+          log.error("Could not place order {}: {}", transfer, exception.toString());
+          result.setErrorResult(exception);
+          return;
+        }
+
+        result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
+      });
+    }
+    catch (Exception e)
+    {
+      log.error("Unexpected exception!", e);
+      result.setErrorResult(e);
+    }
+
+    return result;
+  }
+}
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/TransferRepository.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/TransferRepository.java
new file mode 100644 (file)
index 0000000..5b3d8b4
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.payment.transfer.persistence;
+
+import de.juplo.kafka.payment.transfer.api.Transfer;
+
+import java.util.Optional;
+import java.util.UUID;
+
+
+public interface TransferRepository
+{
+  void put(Transfer transfer);
+  Optional<Transfer> get(UUID uuid);
+}