From: Kai Moritz <kai@juplo.de> Date: Sun, 7 Jun 2020 11:00:14 +0000 (+0200) Subject: streams - Übungen - Microservices - Schritt 03 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=4f784f887f530419d66700b3e4e379c7ff36340a;p=demos%2Fmicroservices streams - Übungen - Microservices - Schritt 03 -- Validate-Order-Service implementiert * Lauscht auf dem Topic "orders" * Beachtet nur Order-Aufträge im Zustand CREATED * Prüft, dass "consumerID", "productID" und "quantity" größer 0 sind * Wenn ja: "PASS", ansonsten "FAIL" --- diff --git a/.gitignore b/.gitignore index df2dd73..a3e1a2e 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ take-order/src/main/java/de/trion/microservices/avro take-order/target details/src/main/java/de/trion/microservices/avro details/target +validate-order/src/main/java/de/trion/microservices/avro +validate-order/target diff --git a/README.sh b/README.sh index b357ba3..3437cc7 100755 --- a/README.sh +++ b/README.sh @@ -13,18 +13,22 @@ mvn package docker build -t trion/take-order-service:01 take-order docker build -t trion/details-service:02 details +docker build -t trion/validate-order-service:03 validate-order docker-compose up -d zookeeper kafka schema-registry while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders +kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic validation -docker-compose up -d take-order details +docker-compose up -d take-order validate-order details kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic orders & +kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic validation & while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for take-order..."; sleep 1; done while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for details..."; sleep 1; done +while ! [[ $(http 0:8093/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for validate-order..."; sleep 1; done http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=5 http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity= http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=-5 diff --git a/details/pom.xml b/details/pom.xml index 492874b..95238f8 100644 --- a/details/pom.xml +++ b/details/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>de.trion.kafka.microservices</groupId> <artifactId>order-example</artifactId> - <version>02</version> + <version>03</version> </parent> <groupId>de.trion.kafka.microservices</groupId> diff --git a/docker-compose.yml b/docker-compose.yml index 7714d3a..67d1327 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -42,6 +42,16 @@ services: - kafka - schema-registry + validate-order: + image: trion/validate-order-service:03 + hostname: validate-order + ports: + - "8093:8080" + depends_on: + - zookeeper + - kafka + - schema-registry + details: image: trion/details-service:02 hostname: details diff --git a/pom.xml b/pom.xml index 88ebc8b..ce8d87a 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ <groupId>de.trion.kafka.microservices</groupId> <artifactId>order-example</artifactId> <name>Order Example</name> - <version>02</version> + <version>03</version> <packaging>pom</packaging> <properties> @@ -23,6 +23,7 @@ <modules> <module>take-order</module> <module>details</module> + <module>validate-order</module> </modules> </project> diff --git a/take-order/pom.xml b/take-order/pom.xml index f5ad727..bf46f8e 100644 --- a/take-order/pom.xml +++ b/take-order/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>de.trion.kafka.microservices</groupId> <artifactId>order-example</artifactId> - <version>02</version> + <version>03</version> </parent> <groupId>de.trion.kafka.microservices</groupId> diff --git a/validate-order/Dockerfile b/validate-order/Dockerfile new file mode 100644 index 0000000..a32db2c --- /dev/null +++ b/validate-order/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:8-jre-slim +COPY target/validate-order-03-SNAPSHOT.jar /opt/ +EXPOSE 8080 +ENTRYPOINT ["java", "-jar", "/opt/validate-order-03-SNAPSHOT.jar"] +CMD [] diff --git a/validate-order/order-validation.avsc b/validate-order/order-validation.avsc new file mode 100644 index 0000000..9db183d --- /dev/null +++ b/validate-order/order-validation.avsc @@ -0,0 +1,31 @@ +[ +{ + "namespace": "de.trion.microservices.avro", + "type": "enum", + "name": "OrderValidationType", + "symbols" : [ "ORDER_DETAILS_CHECK" ] +}, +{ + "namespace": "de.trion.microservices.avro", + "type": "enum", + "name": "OrderValidationResult", + "symbols" : [ "PASS", "FAIL" ] +}, +{ + "namespace": "de.trion.microservices.avro", + "type": "record", + "name": "OrderValidation", + "fields": [ + { "name": "orderId", "type": "string" }, + { "name": "checkType", "type": "OrderValidationType" }, + { "name": "validationResult", "type": "OrderValidationResult" }, + { + "name": "messages", + "type": + { + "type": "array", + "items": "string" + } + } + ] +}] diff --git a/validate-order/order.avsc b/validate-order/order.avsc new file mode 100644 index 0000000..c857b50 --- /dev/null +++ b/validate-order/order.avsc @@ -0,0 +1,21 @@ +[ + { + "namespace": "de.trion.microservices.avro", + "type": "enum", + "name": "OrderState", + "symbols" : [ "CREATED" ] + }, + { + "namespace": "de.trion.microservices.avro", + "type": "record", + "name": "Order", + "fields": [ + { "name": "id", "type": "string" }, + { "name": "state", "type": "OrderState" }, + { "name": "customerId", "type": "long" }, + { "name": "orderId", "type": "long" }, + { "name": "productId", "type": "long" }, + { "name": "quantity", "type": "int" } + ] + } +] diff --git a/validate-order/pom.xml b/validate-order/pom.xml new file mode 100644 index 0000000..63bded2 --- /dev/null +++ b/validate-order/pom.xml @@ -0,0 +1,80 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>de.trion.kafka.microservices</groupId> + <artifactId>order-example</artifactId> + <version>03</version> + </parent> + + <groupId>de.trion.kafka.microservices</groupId> + <artifactId>validate-order</artifactId> + <name>Order Validation Service</name> + <version>03-SNAPSHOT</version> + + <properties> + <avro.version>1.9.0</avro.version> + <confluent.version>5.3.0</confluent.version> + <kafka.version>2.3.0</kafka.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-actuator</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-streams</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + <dependency> + <groupId>io.confluent</groupId> + <artifactId>kafka-streams-avro-serde</artifactId> + <version>${confluent.version}</version> + </dependency> + </dependencies> + + <repositories> + <repository> + <id>confluent</id> + <url>https://packages.confluent.io/maven/</url> + </repository> + </repositories> + + <build> + <plugins> + <plugin> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>${avro.version}</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <sourceDirectory>${project.basedir}/</sourceDirectory> + <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/validate-order/src/main/java/de/trion/microservices/validateorder/Application.java b/validate-order/src/main/java/de/trion/microservices/validateorder/Application.java new file mode 100644 index 0000000..bfdde7a --- /dev/null +++ b/validate-order/src/main/java/de/trion/microservices/validateorder/Application.java @@ -0,0 +1,22 @@ +package de.trion.microservices.validateorder; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application +{ + private final static Logger LOG = LoggerFactory.getLogger(Application.class); + + + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} \ No newline at end of file diff --git a/validate-order/src/main/java/de/trion/microservices/validateorder/ApplicationProperties.java b/validate-order/src/main/java/de/trion/microservices/validateorder/ApplicationProperties.java new file mode 100644 index 0000000..25d6515 --- /dev/null +++ b/validate-order/src/main/java/de/trion/microservices/validateorder/ApplicationProperties.java @@ -0,0 +1,55 @@ +package de.trion.microservices.validateorder; + + +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("validate-order") +public class ApplicationProperties +{ + String bootstrapServers = "kafka:9092"; + String schemaRegistryUrl = "http://schema-registry:8081"; + String ordersTopic = "orders"; + String validationTopic = "validation"; + + + public String getBootstrapServers() + { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) + { + this.bootstrapServers = bootstrapServers; + } + + public String getSchemaRegistryUrl() + { + return schemaRegistryUrl; + } + + public void setSchemaRegistryUrl(String schemaRegistryUrl) + { + this.schemaRegistryUrl = schemaRegistryUrl; + } + + public String getOrdersTopic() + { + return ordersTopic; + } + + public void setOrdersTopic(String topic) + { + this.ordersTopic = topic; + } + + public String getValidationTopic() + { + return validationTopic; + } + + public void setValidationTopic(String topic) + { + this.validationTopic = topic; + } +} diff --git a/validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java b/validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java new file mode 100644 index 0000000..1ad46ce --- /dev/null +++ b/validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java @@ -0,0 +1,101 @@ +package de.trion.microservices.validateorder; + + +import de.trion.microservices.avro.Order; +import de.trion.microservices.avro.OrderState; +import de.trion.microservices.avro.OrderValidation; +import static de.trion.microservices.avro.OrderValidationResult.FAIL; +import static de.trion.microservices.avro.OrderValidationResult.PASS; +import static de.trion.microservices.avro.OrderValidationType.ORDER_DETAILS_CHECK; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + + +@Component +public class ValidateOrderService +{ + final static Logger LOG = LoggerFactory.getLogger(ValidateOrderService.class); + + private final String orders; + private final String validation; + private final KafkaStreams streams; + + + public ValidateOrderService(ApplicationProperties config) + { + orders = config.ordersTopic; + validation = config.validationTopic; + + Properties properties = new Properties(); + properties.put("bootstrap.servers", config.bootstrapServers); + properties.put("application.id", "validate-order"); + properties.put("schema.registry.url", config.schemaRegistryUrl); + properties.put("default.key.serde", Serdes.String().getClass()); + properties.put("default.value.serde", SpecificAvroSerde.class); + + StreamsBuilder builder = new StreamsBuilder(); + builder + .<String,Order>stream(orders) + .filter((id, order) -> order.getState() == OrderState.CREATED) + .mapValues((order) -> + { + List<CharSequence> messages = new LinkedList<>(); + + if (order.getCustomerId() < 1) + messages.add("Customer-ID must be greater than 0"); + if (order.getProductId() < 1) + messages.add("Product-ID must be greater than 0"); + if (order.getQuantity() < 1) + messages.add("The ordered quantity must be greater than 0"); + + return + OrderValidation + .newBuilder() + .setOrderId(order.getId()) + .setCheckType(ORDER_DETAILS_CHECK) + .setValidationResult(messages.isEmpty() ? PASS : FAIL) + .setMessages(messages) + .build(); + }) + .to(validation); + + Topology topology = builder.build(); + streams = new KafkaStreams(topology, properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + + + @PostConstruct + public void start() + { + streams.start(); + } + + @PreDestroy + public void stop() + { + streams.close(); + } +} diff --git a/validate-order/src/main/resources/application.properties b/validate-order/src/main/resources/application.properties new file mode 100644 index 0000000..ce81378 --- /dev/null +++ b/validate-order/src/main/resources/application.properties @@ -0,0 +1 @@ +logging.level.de.trion=debug