WIP
authorKai Moritz <kai@juplo.de>
Fri, 11 Jun 2021 14:53:13 +0000 (16:53 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 11 Jun 2021 14:53:13 +0000 (16:53 +0200)
transfer/order.avsc [deleted file]
transfer/pom.xml
transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java
transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
transfer/src/main/resources/transfer.avr [new file with mode: 0644]

diff --git a/transfer/order.avsc b/transfer/order.avsc
deleted file mode 100644 (file)
index 81775da..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-[
-  {
-    "namespace": "de.juplo.kafka.payment.avro",
-    "type": "enum",
-    "name": "OrderState",
-    "symbols" : [ "CREATED" ],
-    "default" : "CREATED"
-  },
-  {
-    "namespace": "de.juplo.kafka.payment.avro",
-    "type": "record",
-    "name": "Order",
-    "fields": [
-      { "name": "id", "type": "string" },
-      { "name": "state", "type": "OrderState" },
-      { "name": "customerId", "type": "long" },
-      { "name": "orderId", "type": "long" },
-      { "name": "productId",  "type": "long" },
-      { "name": "quantity", "type": "int" }
-    ]
-  }
-]
index f1305cf..d18f4f3 100644 (file)
@@ -97,7 +97,7 @@
                                                        <goal>schema</goal>
                                                </goals>
                                                <configuration>
-                                                       <sourceDirectory>${project.basedir}/</sourceDirectory>
+                                                       <sourceDirectory>${project.basedir}/src/main/resources</sourceDirectory>
                                                        <outputDirectory>${project.basedir}/target/generated-sources/</outputDirectory>
                                                </configuration>
                                        </execution>
index 15550af..1b81028 100644 (file)
@@ -25,13 +25,13 @@ public class TransferService
 {
   private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
 
-  private final KafkaProducer<UUID, Order> producer;
+  private final KafkaProducer<UUID, TransferBean> producer;
   private final String topic;
   private final String path;
 
 
   TransferService(
-      final KafkaProducer<UUID,Order> producer,
+      final KafkaProducer<UUID, TransferBean> producer,
       final TransferServiceProperties properties)
   {
     this.producer = producer;
@@ -44,32 +44,32 @@ public class TransferService
       path = "/orders",
       consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,
       produces = MediaType.TEXT_PLAIN_VALUE)
-  public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody Transfer order)
+  public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody TransferBean transfer)
   {
     DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
 
     try
     {
       UUID uuid = UUID.randomUUID();
-      ProducerRecord<UUID, Order> record =
+      ProducerRecord<UUID, TransferBean> record =
           new ProducerRecord<>(
               topic, 
               uuid,
-              Order
-                  .newBuilder()
-                  .setId(uuid.toString())
-                  .setState(OrderState.CREATED)
-                  .setCustomerId(order.getCustomerId())
-                  .setOrderId(order.getId())
-                  .setProductId(order.getProductId())
-                  .setQuantity(order.getQuantity())
+              Transfer
+                  .new
+                  .id(transfer.getId().toString())
+                  .setState(TransferState.CREATED)
+                  .setCustomerId(transfer.getCustomerId())
+                  .setOrderId(transfer.getId())
+                  .setProductId(transfer.getProductId())
+                  .setQuantity(transfer.getQuantity())
                   .build());
 
       producer.send(record, (metadata, exception) ->
       {
         if (exception != null)
         {
-          LOG.error("Could not place order {}: {}", order, exception.toString());
+          LOG.error("Could not place order {}: {}", transfer, exception.toString());
           result.setErrorResult(exception);
           return;
         }
index b42ca47..26b14d6 100644 (file)
@@ -4,10 +4,10 @@ package de.juplo.kafka.payment.transfer;
 import java.util.Properties;
 import java.util.UUID;
 
-import de.juplo.kafka.payment.avro.Order;
 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.UUIDSerializer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -21,10 +21,10 @@ import org.springframework.context.annotation.Bean;
 public class TransferServiceApplication
 {
   @Bean(destroyMethod = "close")
-  KafkaProducer<UUID, Order> producer(TransferServiceProperties properties)
+  KafkaProducer<UUID, TransferBean> producer(TransferServiceProperties properties)
   {
     Properties props = new Properties();
-    props.put("bootstrap.servers", properties.bootstrapServers);
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
     props.put("schema.registry.url", properties.schemaRegistryUrl);
     props.put("key.serializer", UUIDSerializer.class.getName());
     props.put("value.serializer", SpecificAvroSerializer.class.getName());
diff --git a/transfer/src/main/resources/transfer.avr b/transfer/src/main/resources/transfer.avr
new file mode 100644 (file)
index 0000000..1aab7a2
--- /dev/null
@@ -0,0 +1,21 @@
+[
+  {
+    "namespace": "de.juplo.kafka.payment.avro",
+    "type": "enum",
+    "name": "TransferState",
+    "symbols" : [ "PENDING" ],
+    "default" : "PENDING"
+  },
+  {
+    "namespace": "de.juplo.kafka.payment.avro",
+    "type": "record",
+    "name": "Transfer",
+    "fields": [
+      { "name": "id", "type": "string" },
+      { "name": "state", "type": "TransferState" },
+      { "name": "payer", "type": "long" },
+      { "name": "payee", "type": "long" },
+      { "name": "amount", "type": "int" }
+    ]
+  }
+]