streams - Übungen - Microservices - Schritt 03
authorKai Moritz <kai@juplo.de>
Sun, 7 Jun 2020 11:00:14 +0000 (13:00 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 7 Jun 2020 15:49:28 +0000 (17:49 +0200)
--
Validate-Order-Service implementiert

* Lauscht auf dem Topic "orders"
* Beachtet nur Order-Aufträge im Zustand CREATED
* Prüft, dass "consumerID", "productID" und "quantity" größer 0 sind
* Wenn ja: "PASS", ansonsten "FAIL"

14 files changed:
.gitignore
README.sh
details/pom.xml
docker-compose.yml
pom.xml
take-order/pom.xml
validate-order/Dockerfile [new file with mode: 0644]
validate-order/order-validation.avsc [new file with mode: 0644]
validate-order/order.avsc [new file with mode: 0644]
validate-order/pom.xml [new file with mode: 0644]
validate-order/src/main/java/de/trion/microservices/validateorder/Application.java [new file with mode: 0644]
validate-order/src/main/java/de/trion/microservices/validateorder/ApplicationProperties.java [new file with mode: 0644]
validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java [new file with mode: 0644]
validate-order/src/main/resources/application.properties [new file with mode: 0644]

index df2dd73..a3e1a2e 100644 (file)
@@ -2,3 +2,5 @@ take-order/src/main/java/de/trion/microservices/avro
 take-order/target
 details/src/main/java/de/trion/microservices/avro
 details/target
+validate-order/src/main/java/de/trion/microservices/avro
+validate-order/target
index b357ba3..3437cc7 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -13,18 +13,22 @@ mvn package
 
 docker build -t trion/take-order-service:01 take-order
 docker build -t trion/details-service:02 details
+docker build -t trion/validate-order-service:03 validate-order
 
 docker-compose up -d zookeeper kafka schema-registry
 
 while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done
 
 kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders
+kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic validation
 
-docker-compose up -d take-order details
+docker-compose up -d take-order validate-order details
 
 kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic orders &
+kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic validation &
 while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for take-order..."; sleep 1; done
 while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for details..."; sleep 1; done
+while ! [[ $(http 0:8093/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for validate-order..."; sleep 1; done
 http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=5
 http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=
 http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=-5
index 492874b..95238f8 100644 (file)
@@ -5,7 +5,7 @@
   <parent>
     <groupId>de.trion.kafka.microservices</groupId>
     <artifactId>order-example</artifactId>
-    <version>02</version>
+    <version>03</version>
   </parent>
 
   <groupId>de.trion.kafka.microservices</groupId>
index 7714d3a..67d1327 100644 (file)
@@ -42,6 +42,16 @@ services:
       - kafka
       - schema-registry
 
+  validate-order:
+    image: trion/validate-order-service:03
+    hostname: validate-order
+    ports:
+      - "8093:8080"
+    depends_on:
+      - zookeeper
+      - kafka
+      - schema-registry
+
   details:
     image: trion/details-service:02
     hostname: details
diff --git a/pom.xml b/pom.xml
index 88ebc8b..ce8d87a 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
   <groupId>de.trion.kafka.microservices</groupId>
   <artifactId>order-example</artifactId>
   <name>Order Example</name>
-  <version>02</version>
+  <version>03</version>
   <packaging>pom</packaging>
 
   <properties>
@@ -23,6 +23,7 @@
   <modules>
     <module>take-order</module>
     <module>details</module>
+    <module>validate-order</module>
   </modules>
 
 </project>
index f5ad727..bf46f8e 100644 (file)
@@ -5,7 +5,7 @@
   <parent>
     <groupId>de.trion.kafka.microservices</groupId>
     <artifactId>order-example</artifactId>
-    <version>02</version>
+    <version>03</version>
   </parent>
 
   <groupId>de.trion.kafka.microservices</groupId>
diff --git a/validate-order/Dockerfile b/validate-order/Dockerfile
new file mode 100644 (file)
index 0000000..a32db2c
--- /dev/null
@@ -0,0 +1,5 @@
+FROM openjdk:8-jre-slim
+COPY target/validate-order-03-SNAPSHOT.jar /opt/
+EXPOSE 8080
+ENTRYPOINT ["java", "-jar", "/opt/validate-order-03-SNAPSHOT.jar"]
+CMD []
diff --git a/validate-order/order-validation.avsc b/validate-order/order-validation.avsc
new file mode 100644 (file)
index 0000000..9db183d
--- /dev/null
@@ -0,0 +1,31 @@
+[
+{
+  "namespace": "de.trion.microservices.avro",
+  "type": "enum",
+  "name": "OrderValidationType",
+  "symbols" : [ "ORDER_DETAILS_CHECK" ]
+},
+{
+  "namespace": "de.trion.microservices.avro",
+  "type": "enum",
+  "name": "OrderValidationResult",
+  "symbols" : [ "PASS", "FAIL" ]
+},
+{
+  "namespace": "de.trion.microservices.avro",
+  "type": "record",
+  "name": "OrderValidation",
+  "fields": [
+      { "name": "orderId", "type": "string" },
+      { "name": "checkType", "type": "OrderValidationType" },
+      { "name": "validationResult",  "type": "OrderValidationResult" },
+      {
+        "name": "messages",
+        "type":
+        {
+          "type": "array",
+          "items": "string"
+        }
+      }
+  ]
+}]
diff --git a/validate-order/order.avsc b/validate-order/order.avsc
new file mode 100644 (file)
index 0000000..c857b50
--- /dev/null
@@ -0,0 +1,21 @@
+[
+  {
+    "namespace": "de.trion.microservices.avro",
+    "type": "enum",
+    "name": "OrderState",
+    "symbols" : [ "CREATED" ]
+  },
+  {
+    "namespace": "de.trion.microservices.avro",
+    "type": "record",
+    "name": "Order",
+    "fields": [
+      { "name": "id", "type": "string" },
+      { "name": "state", "type": "OrderState" },
+      { "name": "customerId", "type": "long" },
+      { "name": "orderId", "type": "long" },
+      { "name": "productId",  "type": "long" },
+      { "name": "quantity", "type": "int" }
+    ]
+  }
+]
diff --git a/validate-order/pom.xml b/validate-order/pom.xml
new file mode 100644 (file)
index 0000000..63bded2
--- /dev/null
@@ -0,0 +1,80 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>de.trion.kafka.microservices</groupId>
+    <artifactId>order-example</artifactId>
+    <version>03</version>
+  </parent>
+
+  <groupId>de.trion.kafka.microservices</groupId>
+  <artifactId>validate-order</artifactId>
+  <name>Order Validation Service</name>
+  <version>03-SNAPSHOT</version>
+
+  <properties>
+    <avro.version>1.9.0</avro.version>
+    <confluent.version>5.3.0</confluent.version>
+    <kafka.version>2.3.0</kafka.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-actuator</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-web</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-streams</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>${avro.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-streams-avro-serde</artifactId>
+      <version>${confluent.version}</version>
+    </dependency>
+  </dependencies>
+
+  <repositories>
+    <repository>
+      <id>confluent</id>
+      <url>https://packages.confluent.io/maven/</url>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>${avro.version}</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <sourceDirectory>${project.basedir}/</sourceDirectory>
+              <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/validate-order/src/main/java/de/trion/microservices/validateorder/Application.java b/validate-order/src/main/java/de/trion/microservices/validateorder/Application.java
new file mode 100644 (file)
index 0000000..bfdde7a
--- /dev/null
@@ -0,0 +1,22 @@
+package de.trion.microservices.validateorder;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+public class Application
+{
+  private final static Logger LOG = LoggerFactory.getLogger(Application.class);
+
+
+  public static void main(String[] args)
+  {
+    SpringApplication.run(Application.class, args);
+  }
+}
\ No newline at end of file
diff --git a/validate-order/src/main/java/de/trion/microservices/validateorder/ApplicationProperties.java b/validate-order/src/main/java/de/trion/microservices/validateorder/ApplicationProperties.java
new file mode 100644 (file)
index 0000000..c632eef
--- /dev/null
@@ -0,0 +1,55 @@
+package de.trion.microservices.validateorder;
+
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("take-order")
+public class ApplicationProperties
+{
+  String bootstrapServers = "kafka:9092";
+  String schemaRegistryUrl = "http://schema-registry:8081";
+  String ordersTopic = "orders";
+  String validationTopic = "validation";
+
+
+  public String getBootstrapServers()
+  {
+    return bootstrapServers;
+  }
+
+  public void setBootstrapServers(String bootstrapServers)
+  {
+    this.bootstrapServers = bootstrapServers;
+  }
+
+  public String getSchemaRegistryUrl()
+  {
+    return schemaRegistryUrl;
+  }
+
+  public void setSchemaRegistryUrl(String schemaRegistryUrl)
+  {
+    this.schemaRegistryUrl = schemaRegistryUrl;
+  }
+
+  public String getOrdersTopic()
+  {
+    return ordersTopic;
+  }
+
+  public void setOrdersTopic(String topic)
+  {
+    this.ordersTopic = topic;
+  }
+
+  public String getValidationTopic()
+  {
+    return validationTopic;
+  }
+
+  public void setValidationTopic(String topic)
+  {
+    this.validationTopic = topic;
+  }
+}
diff --git a/validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java b/validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java
new file mode 100644 (file)
index 0000000..1ad46ce
--- /dev/null
@@ -0,0 +1,101 @@
+package de.trion.microservices.validateorder;
+
+
+import de.trion.microservices.avro.Order;
+import de.trion.microservices.avro.OrderState;
+import de.trion.microservices.avro.OrderValidation;
+import static de.trion.microservices.avro.OrderValidationResult.FAIL;
+import static de.trion.microservices.avro.OrderValidationResult.PASS;
+import static de.trion.microservices.avro.OrderValidationType.ORDER_DETAILS_CHECK;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class ValidateOrderService
+{
+  final static Logger LOG = LoggerFactory.getLogger(ValidateOrderService.class);
+
+  private final String orders;
+  private final String validation;
+  private final KafkaStreams streams;
+
+
+  public ValidateOrderService(ApplicationProperties config)
+  {
+    orders = config.ordersTopic;
+    validation = config.validationTopic;
+
+    Properties properties = new Properties();
+    properties.put("bootstrap.servers", config.bootstrapServers);
+    properties.put("application.id", "validate-order");
+    properties.put("schema.registry.url", config.schemaRegistryUrl);
+    properties.put("default.key.serde", Serdes.String().getClass());
+    properties.put("default.value.serde", SpecificAvroSerde.class);
+
+    StreamsBuilder builder = new StreamsBuilder();
+    builder
+        .<String,Order>stream(orders)
+        .filter((id, order) -> order.getState() == OrderState.CREATED)
+        .mapValues((order) ->
+        {
+          List<CharSequence> messages = new LinkedList<>();
+
+          if (order.getCustomerId() < 1)
+            messages.add("Customer-ID must be greater than 0");
+          if (order.getProductId() < 1)
+            messages.add("Product-ID must be greater than 0");
+          if (order.getQuantity() < 1)
+            messages.add("The ordered quantity must be greater than 0");
+
+          return
+              OrderValidation
+                  .newBuilder()
+                  .setOrderId(order.getId())
+                  .setCheckType(ORDER_DETAILS_CHECK)
+                  .setValidationResult(messages.isEmpty() ? PASS : FAIL)
+                  .setMessages(messages)
+                  .build();
+        })
+        .to(validation);
+
+    Topology topology = builder.build();
+    streams = new KafkaStreams(topology, properties);
+    streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+    {
+      LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+      try
+      {
+        streams.close();
+      }
+      catch (Exception ex)
+      {
+        LOG.error("Could not close KafkaStreams!", ex);
+      }
+    });
+  }
+
+
+  @PostConstruct
+  public void start()
+  {
+    streams.start();
+  }
+
+  @PreDestroy
+  public void stop()
+  {
+    streams.close();
+  }
+}
diff --git a/validate-order/src/main/resources/application.properties b/validate-order/src/main/resources/application.properties
new file mode 100644 (file)
index 0000000..ce81378
--- /dev/null
@@ -0,0 +1 @@
+logging.level.de.trion=debug