WIP
authorKai Moritz <kai@juplo.de>
Fri, 11 Jun 2021 15:04:47 +0000 (17:04 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 11 Jun 2021 15:04:47 +0000 (17:04 +0200)
docker-compose.yml
transfer/pom.xml
transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferBean.java [deleted file]
transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java
transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java
transfer/src/main/resources/transfer.avr [deleted file]

index e6655e7..ef24c74 100644 (file)
@@ -20,20 +20,7 @@ services:
     depends_on:
       - zookeeper
 
-  schema-registry:
-    image: confluentinc/cp-schema-registry:6.2.0
-    hostname: schema-registry
-    ports:
-      - "8081:8081"
-    environment:
-      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9093
-      SCHEMA_REGISTRY_HOST_NAME: schema-registry
-      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
-    depends_on:
-      - zookeeper
-      - kafka
-
-  transfer
+  transfer:
     image: juplo/transfer-service:mvp
     ports:
       - "8091:8080"
index d18f4f3..daea31b 100644 (file)
@@ -15,7 +15,6 @@
        <description>An MVP for the Transfer Service</description>
        <properties>
                <java.version>11</java.version>
-               <avro.version>1.10.2</avro.version>
                <confluent.version>6.2.0</confluent.version>
                <kafka.version>2.8.0</kafka.version>
        </properties>
                        <scope>runtime</scope>
                        <optional>true</optional>
                </dependency>
-               <dependency>
-                       <groupId>org.apache.avro</groupId>
-                       <artifactId>avro</artifactId>
-                       <version>${avro.version}</version>
-               </dependency>
-               <dependency>
-                       <groupId>io.confluent</groupId>
-                       <artifactId>kafka-streams-avro-serde</artifactId>
-                       <version>${confluent.version}</version>
-               </dependency>
                <dependency>
                        <groupId>jakarta.validation</groupId>
                        <artifactId>jakarta.validation-api</artifactId>
                </dependency>
        </dependencies>
 
-       <repositories>
-               <repository>
-                       <id>confluent</id>
-                       <url>https://packages.confluent.io/maven/</url>
-               </repository>
-       </repositories>
-
        <build>
                <plugins>
                        <plugin>
                                        </excludes>
                                </configuration>
                        </plugin>
-                       <plugin>
-                               <groupId>org.apache.avro</groupId>
-                               <artifactId>avro-maven-plugin</artifactId>
-                               <version>${avro.version}</version>
-                               <executions>
-                                       <execution>
-                                               <phase>generate-sources</phase>
-                                               <goals>
-                                                       <goal>schema</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sourceDirectory>${project.basedir}/src/main/resources</sourceDirectory>
-                                                       <outputDirectory>${project.basedir}/target/generated-sources/</outputDirectory>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
                </plugins>
        </build>
 
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferBean.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferBean.java
deleted file mode 100644 (file)
index 498fb69..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-package de.juplo.kafka.payment.transfer;
-
-import lombok.Builder;
-import lombok.Data;
-
-import javax.validation.constraints.NotNull;
-import java.util.UUID;
-
-
-/**
- * Simple DTO used by the REST interface
- */
-@Data
-@Builder
-public class TransferBean
-{
-  @NotNull(message = "Cannot be null")
-  private UUID id;
-  @NotNull(message = "Cannot be null")
-  private long payer;
-  @NotNull(message = "Cannot be null")
-  private long payee;
-  @NotNull(message = "Cannot be null")
-  private int amount;
-}
index 1b81028..9e72af1 100644 (file)
@@ -1,11 +1,7 @@
 package de.juplo.kafka.payment.transfer;
 
 
-import java.net.URI;
-import java.util.UUID;
-
-import de.juplo.kafka.payment.avro.Order;
-import de.juplo.kafka.payment.avro.OrderState;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
@@ -18,6 +14,8 @@ import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.validation.Valid;
+import java.net.URI;
+import java.util.UUID;
 
 
 @RestController
@@ -25,45 +23,39 @@ public class TransferService
 {
   private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
 
-  private final KafkaProducer<UUID, TransferBean> producer;
+  private final KafkaProducer<UUID, String> producer;
+  private final ObjectMapper mapper;
   private final String topic;
   private final String path;
 
 
   TransferService(
-      final KafkaProducer<UUID, TransferBean> producer,
+      final KafkaProducer<UUID, String> producer,
+      final ObjectMapper mapper,
       final TransferServiceProperties properties)
   {
     this.producer = producer;
+    this.mapper = mapper;
     this.topic = properties.getTopic();
     this.path = properties.getPath();
   }
 
 
   @PostMapping(
-      path = "/orders",
-      consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,
+      path = "/transfer",
+      consumes = MediaType.APPLICATION_JSON_VALUE,
       produces = MediaType.TEXT_PLAIN_VALUE)
-  public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody TransferBean transfer)
+  public DeferredResult<ResponseEntity<?>> transfer(@Valid @RequestBody Transfer transfer)
   {
     DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
 
     try
     {
-      UUID uuid = UUID.randomUUID();
-      ProducerRecord<UUID, TransferBean> record =
+      ProducerRecord<UUID, String> record =
           new ProducerRecord<>(
-              topic, 
-              uuid,
-              Transfer
-                  .new
-                  .id(transfer.getId().toString())
-                  .setState(TransferState.CREATED)
-                  .setCustomerId(transfer.getCustomerId())
-                  .setOrderId(transfer.getId())
-                  .setProductId(transfer.getProductId())
-                  .setQuantity(transfer.getQuantity())
-                  .build());
+              topic,
+              transfer.getId(),
+              mapper.writeValueAsString(transfer));
 
       producer.send(record, (metadata, exception) ->
       {
@@ -74,7 +66,7 @@ public class TransferService
           return;
         }
 
-        result.setResult(ResponseEntity.created(URI.create(path + uuid)).build());
+        result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
       });
     }
     catch (Exception e)
index 26b14d6..484d9c2 100644 (file)
@@ -1,19 +1,19 @@
 package de.juplo.kafka.payment.transfer;
 
 
-import java.util.Properties;
-import java.util.UUID;
-
-import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.serialization.UUIDSerializer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 
+import java.util.Properties;
+import java.util.UUID;
+
 
 @SpringBootApplication
 @EnableConfigurationProperties(TransferServiceProperties.class)
@@ -21,13 +21,12 @@ import org.springframework.context.annotation.Bean;
 public class TransferServiceApplication
 {
   @Bean(destroyMethod = "close")
-  KafkaProducer<UUID, TransferBean> producer(TransferServiceProperties properties)
+  KafkaProducer<UUID, String> producer(TransferServiceProperties properties)
   {
     Properties props = new Properties();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
-    props.put("schema.registry.url", properties.schemaRegistryUrl);
-    props.put("key.serializer", UUIDSerializer.class.getName());
-    props.put("value.serializer", SpecificAvroSerializer.class.getName());
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class);
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
     return new KafkaProducer<>(props);
   }
index 8a50001..bea701f 100644 (file)
@@ -1,55 +1,17 @@
 package de.juplo.kafka.payment.transfer;
 
 
+import lombok.Getter;
+import lombok.Setter;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
 
-@ConfigurationProperties("take-order")
+@ConfigurationProperties("juplo.transfer")
+@Getter
+@Setter
 public class TransferServiceProperties
 {
-  String bootstrapServers = "kafka:9092";
-  String schemaRegistryUrl = "http://schema-registry:8081";
-  String topic = "orders";
-  String path = "http://details:8092/orders/";
-
-
-  public String getBootstrapServers()
-  {
-    return bootstrapServers;
-  }
-
-  public void setBootstrapServers(String bootstrapServers)
-  {
-    this.bootstrapServers = bootstrapServers;
-  }
-
-  public String getSchemaRegistryUrl()
-  {
-    return schemaRegistryUrl;
-  }
-
-  public void setSchemaRegistryUrl(String schemaRegistryUrl)
-  {
-    this.schemaRegistryUrl = schemaRegistryUrl;
-  }
-
-  public String getTopic()
-  {
-    return topic;
-  }
-
-  public void setTopic(String topic)
-  {
-    this.topic = topic;
-  }
-
-  public String getPath()
-  {
-    return path;
-  }
-
-  public void setPath(String path)
-  {
-    this.path = path;
-  }
+  String bootstrapServers = "localhost:9092";
+  String topic = "transfers";
+  String path = "http://details:8092/transfers/";
 }
diff --git a/transfer/src/main/resources/transfer.avr b/transfer/src/main/resources/transfer.avr
deleted file mode 100644 (file)
index 1aab7a2..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-[
-  {
-    "namespace": "de.juplo.kafka.payment.avro",
-    "type": "enum",
-    "name": "TransferState",
-    "symbols" : [ "PENDING" ],
-    "default" : "PENDING"
-  },
-  {
-    "namespace": "de.juplo.kafka.payment.avro",
-    "type": "record",
-    "name": "Transfer",
-    "fields": [
-      { "name": "id", "type": "string" },
-      { "name": "state", "type": "TransferState" },
-      { "name": "payer", "type": "long" },
-      { "name": "payee", "type": "long" },
-      { "name": "amount", "type": "int" }
-    ]
-  }
-]