From 4f784f887f530419d66700b3e4e379c7ff36340a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 7 Jun 2020 13:00:14 +0200 Subject: [PATCH] =?utf8?q?streams=20-=20=C3=9Cbungen=20-=20Microservices?= =?utf8?q?=20-=20Schritt=2003=20--=20Validate-Order-Service=20implementier?= =?utf8?q?t?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Lauscht auf dem Topic "orders" * Beachtet nur Order-Aufträge im Zustand CREATED * Prüft, dass "consumerID", "productID" und "quantity" größer 0 sind * Wenn ja: "PASS", ansonsten "FAIL" --- .gitignore | 2 + README.sh | 6 +- details/pom.xml | 2 +- docker-compose.yml | 10 ++ pom.xml | 3 +- take-order/pom.xml | 2 +- validate-order/Dockerfile | 5 + validate-order/order-validation.avsc | 31 ++++++ validate-order/order.avsc | 21 ++++ validate-order/pom.xml | 80 ++++++++++++++ .../validateorder/Application.java | 22 ++++ .../validateorder/ApplicationProperties.java | 55 ++++++++++ .../validateorder/ValidateOrderService.java | 101 ++++++++++++++++++ .../src/main/resources/application.properties | 1 + 14 files changed, 337 insertions(+), 4 deletions(-) create mode 100644 validate-order/Dockerfile create mode 100644 validate-order/order-validation.avsc create mode 100644 validate-order/order.avsc create mode 100644 validate-order/pom.xml create mode 100644 validate-order/src/main/java/de/trion/microservices/validateorder/Application.java create mode 100644 validate-order/src/main/java/de/trion/microservices/validateorder/ApplicationProperties.java create mode 100644 validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java create mode 100644 validate-order/src/main/resources/application.properties diff --git a/.gitignore b/.gitignore index df2dd73..a3e1a2e 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ take-order/src/main/java/de/trion/microservices/avro take-order/target details/src/main/java/de/trion/microservices/avro details/target +validate-order/src/main/java/de/trion/microservices/avro +validate-order/target diff --git a/README.sh b/README.sh index b357ba3..3437cc7 100755 --- a/README.sh +++ b/README.sh @@ -13,18 +13,22 @@ mvn package docker build -t trion/take-order-service:01 take-order docker build -t trion/details-service:02 details +docker build -t trion/validate-order-service:03 validate-order docker-compose up -d zookeeper kafka schema-registry while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders +kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic validation -docker-compose up -d take-order details +docker-compose up -d take-order validate-order details kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic orders & +kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic validation & while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for take-order..."; sleep 1; done while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for details..."; sleep 1; done +while ! [[ $(http 0:8093/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for validate-order..."; sleep 1; done http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=5 http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity= http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=-5 diff --git a/details/pom.xml b/details/pom.xml index 492874b..95238f8 100644 --- a/details/pom.xml +++ b/details/pom.xml @@ -5,7 +5,7 @@ de.trion.kafka.microservices order-example - 02 + 03 de.trion.kafka.microservices diff --git a/docker-compose.yml b/docker-compose.yml index 7714d3a..67d1327 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -42,6 +42,16 @@ services: - kafka - schema-registry + validate-order: + image: trion/validate-order-service:03 + hostname: validate-order + ports: + - "8093:8080" + depends_on: + - zookeeper + - kafka + - schema-registry + details: image: trion/details-service:02 hostname: details diff --git a/pom.xml b/pom.xml index 88ebc8b..ce8d87a 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ de.trion.kafka.microservices order-example Order Example - 02 + 03 pom @@ -23,6 +23,7 @@ take-order details + validate-order diff --git a/take-order/pom.xml b/take-order/pom.xml index f5ad727..bf46f8e 100644 --- a/take-order/pom.xml +++ b/take-order/pom.xml @@ -5,7 +5,7 @@ de.trion.kafka.microservices order-example - 02 + 03 de.trion.kafka.microservices diff --git a/validate-order/Dockerfile b/validate-order/Dockerfile new file mode 100644 index 0000000..a32db2c --- /dev/null +++ b/validate-order/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:8-jre-slim +COPY target/validate-order-03-SNAPSHOT.jar /opt/ +EXPOSE 8080 +ENTRYPOINT ["java", "-jar", "/opt/validate-order-03-SNAPSHOT.jar"] +CMD [] diff --git a/validate-order/order-validation.avsc b/validate-order/order-validation.avsc new file mode 100644 index 0000000..9db183d --- /dev/null +++ b/validate-order/order-validation.avsc @@ -0,0 +1,31 @@ +[ +{ + "namespace": "de.trion.microservices.avro", + "type": "enum", + "name": "OrderValidationType", + "symbols" : [ "ORDER_DETAILS_CHECK" ] +}, +{ + "namespace": "de.trion.microservices.avro", + "type": "enum", + "name": "OrderValidationResult", + "symbols" : [ "PASS", "FAIL" ] +}, +{ + "namespace": "de.trion.microservices.avro", + "type": "record", + "name": "OrderValidation", + "fields": [ + { "name": "orderId", "type": "string" }, + { "name": "checkType", "type": "OrderValidationType" }, + { "name": "validationResult", "type": "OrderValidationResult" }, + { + "name": "messages", + "type": + { + "type": "array", + "items": "string" + } + } + ] +}] diff --git a/validate-order/order.avsc b/validate-order/order.avsc new file mode 100644 index 0000000..c857b50 --- /dev/null +++ b/validate-order/order.avsc @@ -0,0 +1,21 @@ +[ + { + "namespace": "de.trion.microservices.avro", + "type": "enum", + "name": "OrderState", + "symbols" : [ "CREATED" ] + }, + { + "namespace": "de.trion.microservices.avro", + "type": "record", + "name": "Order", + "fields": [ + { "name": "id", "type": "string" }, + { "name": "state", "type": "OrderState" }, + { "name": "customerId", "type": "long" }, + { "name": "orderId", "type": "long" }, + { "name": "productId", "type": "long" }, + { "name": "quantity", "type": "int" } + ] + } +] diff --git a/validate-order/pom.xml b/validate-order/pom.xml new file mode 100644 index 0000000..63bded2 --- /dev/null +++ b/validate-order/pom.xml @@ -0,0 +1,80 @@ + + + 4.0.0 + + + de.trion.kafka.microservices + order-example + 03 + + + de.trion.kafka.microservices + validate-order + Order Validation Service + 03-SNAPSHOT + + + 1.9.0 + 5.3.0 + 2.3.0 + + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-web + + + org.apache.kafka + kafka-streams + + + org.apache.avro + avro + ${avro.version} + + + io.confluent + kafka-streams-avro-serde + ${confluent.version} + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/ + ${project.basedir}/src/main/java/ + + + + + + + + diff --git a/validate-order/src/main/java/de/trion/microservices/validateorder/Application.java b/validate-order/src/main/java/de/trion/microservices/validateorder/Application.java new file mode 100644 index 0000000..bfdde7a --- /dev/null +++ b/validate-order/src/main/java/de/trion/microservices/validateorder/Application.java @@ -0,0 +1,22 @@ +package de.trion.microservices.validateorder; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application +{ + private final static Logger LOG = LoggerFactory.getLogger(Application.class); + + + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} \ No newline at end of file diff --git a/validate-order/src/main/java/de/trion/microservices/validateorder/ApplicationProperties.java b/validate-order/src/main/java/de/trion/microservices/validateorder/ApplicationProperties.java new file mode 100644 index 0000000..25d6515 --- /dev/null +++ b/validate-order/src/main/java/de/trion/microservices/validateorder/ApplicationProperties.java @@ -0,0 +1,55 @@ +package de.trion.microservices.validateorder; + + +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("validate-order") +public class ApplicationProperties +{ + String bootstrapServers = "kafka:9092"; + String schemaRegistryUrl = "http://schema-registry:8081"; + String ordersTopic = "orders"; + String validationTopic = "validation"; + + + public String getBootstrapServers() + { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) + { + this.bootstrapServers = bootstrapServers; + } + + public String getSchemaRegistryUrl() + { + return schemaRegistryUrl; + } + + public void setSchemaRegistryUrl(String schemaRegistryUrl) + { + this.schemaRegistryUrl = schemaRegistryUrl; + } + + public String getOrdersTopic() + { + return ordersTopic; + } + + public void setOrdersTopic(String topic) + { + this.ordersTopic = topic; + } + + public String getValidationTopic() + { + return validationTopic; + } + + public void setValidationTopic(String topic) + { + this.validationTopic = topic; + } +} diff --git a/validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java b/validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java new file mode 100644 index 0000000..1ad46ce --- /dev/null +++ b/validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java @@ -0,0 +1,101 @@ +package de.trion.microservices.validateorder; + + +import de.trion.microservices.avro.Order; +import de.trion.microservices.avro.OrderState; +import de.trion.microservices.avro.OrderValidation; +import static de.trion.microservices.avro.OrderValidationResult.FAIL; +import static de.trion.microservices.avro.OrderValidationResult.PASS; +import static de.trion.microservices.avro.OrderValidationType.ORDER_DETAILS_CHECK; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + + +@Component +public class ValidateOrderService +{ + final static Logger LOG = LoggerFactory.getLogger(ValidateOrderService.class); + + private final String orders; + private final String validation; + private final KafkaStreams streams; + + + public ValidateOrderService(ApplicationProperties config) + { + orders = config.ordersTopic; + validation = config.validationTopic; + + Properties properties = new Properties(); + properties.put("bootstrap.servers", config.bootstrapServers); + properties.put("application.id", "validate-order"); + properties.put("schema.registry.url", config.schemaRegistryUrl); + properties.put("default.key.serde", Serdes.String().getClass()); + properties.put("default.value.serde", SpecificAvroSerde.class); + + StreamsBuilder builder = new StreamsBuilder(); + builder + .stream(orders) + .filter((id, order) -> order.getState() == OrderState.CREATED) + .mapValues((order) -> + { + List messages = new LinkedList<>(); + + if (order.getCustomerId() < 1) + messages.add("Customer-ID must be greater than 0"); + if (order.getProductId() < 1) + messages.add("Product-ID must be greater than 0"); + if (order.getQuantity() < 1) + messages.add("The ordered quantity must be greater than 0"); + + return + OrderValidation + .newBuilder() + .setOrderId(order.getId()) + .setCheckType(ORDER_DETAILS_CHECK) + .setValidationResult(messages.isEmpty() ? PASS : FAIL) + .setMessages(messages) + .build(); + }) + .to(validation); + + Topology topology = builder.build(); + streams = new KafkaStreams(topology, properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + + + @PostConstruct + public void start() + { + streams.start(); + } + + @PreDestroy + public void stop() + { + streams.close(); + } +} diff --git a/validate-order/src/main/resources/application.properties b/validate-order/src/main/resources/application.properties new file mode 100644 index 0000000..ce81378 --- /dev/null +++ b/validate-order/src/main/resources/application.properties @@ -0,0 +1 @@ +logging.level.de.trion=debug -- 2.20.1