From: Kai Moritz Date: Sun, 7 Jun 2020 12:01:55 +0000 (+0200) Subject: streams - Übungen - Microservices - Schritt 03 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9c32239c9f2889c34499623329c9bf801d0a4288;p=demos%2Fmicroservices streams - Übungen - Microservices - Schritt 03 -- Validation-Results-Service implementiert * Lauscht auf dem Topic "validations" * Passt den Status der Order entsprechend an * "APPROVED" bei "PASS", "DECLINED" bei "FAIL" * _Hintergrund:_ Single Writer -- Nur der Bounded Context "Order" greift schreibend auf das Topic "orders" zu! * _Beachte:_ Grenzen des Bounded Context sind nicht (müssen nicht) mit den Microservices übereinstimmen! + _Hier:_ BC "Order" besteht aus "take-order", "details", "validation-results" --- diff --git a/.gitignore b/.gitignore index a3e1a2e..a233c51 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ details/src/main/java/de/trion/microservices/avro details/target validate-order/src/main/java/de/trion/microservices/avro validate-order/target +validation-results/src/main/java/de/trion/microservices/avro +validation-results/target diff --git a/README.sh b/README.sh index 3437cc7..c6d4d41 100755 --- a/README.sh +++ b/README.sh @@ -14,6 +14,7 @@ mvn package docker build -t trion/take-order-service:01 take-order docker build -t trion/details-service:02 details docker build -t trion/validate-order-service:03 validate-order +docker build -t trion/validation-results-service:03 validation-results docker-compose up -d zookeeper kafka schema-registry @@ -22,13 +23,14 @@ while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 100 kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic validation -docker-compose up -d take-order validate-order details +docker-compose up -d take-order validate-order validation-results details kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic orders & kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic validation & while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for take-order..."; sleep 1; done while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for details..."; sleep 1; done while ! [[ $(http 0:8093/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for validate-order..."; sleep 1; done +while ! [[ $(http 0:8094/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for validation-results..."; sleep 1; done http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=5 http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity= http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=-5 diff --git a/docker-compose.yml b/docker-compose.yml index 67d1327..bc88907 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -52,6 +52,16 @@ services: - kafka - schema-registry + validation-results: + image: trion/validation-results-service:03 + hostname: validation-results + ports: + - "8094:8080" + depends_on: + - zookeeper + - kafka + - schema-registry + details: image: trion/details-service:02 hostname: details diff --git a/pom.xml b/pom.xml index ce8d87a..1cce338 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,7 @@ take-order details validate-order + validation-results diff --git a/validation-results/Dockerfile b/validation-results/Dockerfile new file mode 100644 index 0000000..a5c8605 --- /dev/null +++ b/validation-results/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:8-jre-slim +COPY target/validation-results-03-SNAPSHOT.jar /opt/ +EXPOSE 8080 +ENTRYPOINT [ "java", "-jar", "/opt/validation-results-03-SNAPSHOT.jar" ] +CMD [] diff --git a/validation-results/order-validation.avsc b/validation-results/order-validation.avsc new file mode 100644 index 0000000..9db183d --- /dev/null +++ b/validation-results/order-validation.avsc @@ -0,0 +1,31 @@ +[ +{ + "namespace": "de.trion.microservices.avro", + "type": "enum", + "name": "OrderValidationType", + "symbols" : [ "ORDER_DETAILS_CHECK" ] +}, +{ + "namespace": "de.trion.microservices.avro", + "type": "enum", + "name": "OrderValidationResult", + "symbols" : [ "PASS", "FAIL" ] +}, +{ + "namespace": "de.trion.microservices.avro", + "type": "record", + "name": "OrderValidation", + "fields": [ + { "name": "orderId", "type": "string" }, + { "name": "checkType", "type": "OrderValidationType" }, + { "name": "validationResult", "type": "OrderValidationResult" }, + { + "name": "messages", + "type": + { + "type": "array", + "items": "string" + } + } + ] +}] diff --git a/validation-results/order.avsc b/validation-results/order.avsc new file mode 100644 index 0000000..1ad0539 --- /dev/null +++ b/validation-results/order.avsc @@ -0,0 +1,21 @@ +[ + { + "namespace": "de.trion.microservices.avro", + "type": "enum", + "name": "OrderState", + "symbols" : ["CREATED", "APPROVED", "DECLINED"] + }, + { + "namespace": "de.trion.microservices.avro", + "type": "record", + "name": "Order", + "fields": [ + { "name": "id", "type": "string" }, + { "name": "state", "type": "OrderState" }, + { "name": "customerId", "type": "long" }, + { "name": "orderId", "type": "long" }, + { "name": "productId", "type": "long" }, + { "name": "quantity", "type": "int" } + ] + } +] diff --git a/validation-results/pom.xml b/validation-results/pom.xml new file mode 100644 index 0000000..e8f02c5 --- /dev/null +++ b/validation-results/pom.xml @@ -0,0 +1,80 @@ + + + 4.0.0 + + + de.trion.kafka.microservices + order-example + 03 + + + de.trion.kafka.microservices + validation-results + Validation Results Service + 03-SNAPSHOT + + + 1.9.0 + 5.3.0 + 2.3.0 + + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-web + + + org.apache.kafka + kafka-streams + + + org.apache.avro + avro + ${avro.version} + + + io.confluent + kafka-streams-avro-serde + ${confluent.version} + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/ + ${project.basedir}/src/main/java/ + + + + + + + + diff --git a/validation-results/src/main/java/de/trion/microservices/validationresults/Application.java b/validation-results/src/main/java/de/trion/microservices/validationresults/Application.java new file mode 100644 index 0000000..43c0b92 --- /dev/null +++ b/validation-results/src/main/java/de/trion/microservices/validationresults/Application.java @@ -0,0 +1,22 @@ +package de.trion.microservices.validationresults; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application +{ + private final static Logger LOG = LoggerFactory.getLogger(Application.class); + + + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} \ No newline at end of file diff --git a/validation-results/src/main/java/de/trion/microservices/validationresults/ApplicationProperties.java b/validation-results/src/main/java/de/trion/microservices/validationresults/ApplicationProperties.java new file mode 100644 index 0000000..110a9d8 --- /dev/null +++ b/validation-results/src/main/java/de/trion/microservices/validationresults/ApplicationProperties.java @@ -0,0 +1,55 @@ +package de.trion.microservices.validationresults; + + +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("validation-results") +public class ApplicationProperties +{ + String bootstrapServers = "kafka:9092"; + String schemaRegistryUrl = "http://schema-registry:8081"; + String ordersTopic = "orders"; + String validationTopic = "validation"; + + + public String getBootstrapServers() + { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) + { + this.bootstrapServers = bootstrapServers; + } + + public String getSchemaRegistryUrl() + { + return schemaRegistryUrl; + } + + public void setSchemaRegistryUrl(String schemaRegistryUrl) + { + this.schemaRegistryUrl = schemaRegistryUrl; + } + + public String getOrdersTopic() + { + return ordersTopic; + } + + public void setOrdersTopic(String topic) + { + this.ordersTopic = topic; + } + + public String getValidationTopic() + { + return validationTopic; + } + + public void setValidationTopic(String topic) + { + this.validationTopic = topic; + } +} diff --git a/validation-results/src/main/java/de/trion/microservices/validationresults/ValidationResultsService.java b/validation-results/src/main/java/de/trion/microservices/validationresults/ValidationResultsService.java new file mode 100644 index 0000000..3b3d94f --- /dev/null +++ b/validation-results/src/main/java/de/trion/microservices/validationresults/ValidationResultsService.java @@ -0,0 +1,92 @@ +package de.trion.microservices.validationresults; + + +import de.trion.microservices.avro.Order; +import de.trion.microservices.avro.OrderState; +import static de.trion.microservices.avro.OrderState.APPROVED; +import static de.trion.microservices.avro.OrderState.DECLINED; +import de.trion.microservices.avro.OrderValidation; +import static de.trion.microservices.avro.OrderValidationResult.PASS; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.time.Duration; +import java.util.Properties; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + + +@Component +public class ValidationResultsService +{ + final static Logger LOG = LoggerFactory.getLogger(ValidationResultsService.class); + + private final String orders; + private final String validation; + private final KafkaStreams streams; + + + public ValidationResultsService(ApplicationProperties config) + { + orders = config.ordersTopic; + validation = config.validationTopic; + + Properties properties = new Properties(); + properties.put("bootstrap.servers", config.bootstrapServers); + properties.put("application.id", "validation-results"); + properties.put("schema.registry.url", config.schemaRegistryUrl); + properties.put("default.key.serde", Serdes.String().getClass()); + properties.put("default.value.serde", SpecificAvroSerde.class); + + StreamsBuilder builder = new StreamsBuilder(); + builder + .stream(orders) + .filter((id, order) -> order.getState() == OrderState.CREATED) + .join( + builder.stream(validation), + (order, outcome) -> + { + return + Order + .newBuilder(order) + .setState(outcome.getValidationResult() == PASS ? APPROVED : DECLINED) + .build(); + }, + JoinWindows.of(Duration.ofHours(1))) + .to(orders); + + Topology topology = builder.build(); + streams = new KafkaStreams(topology, properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + + + @PostConstruct + public void start() + { + streams.start(); + } + + @PreDestroy + public void stop() + { + streams.close(); + } +} diff --git a/validation-results/src/main/resources/application.properties b/validation-results/src/main/resources/application.properties new file mode 100644 index 0000000..ce81378 --- /dev/null +++ b/validation-results/src/main/resources/application.properties @@ -0,0 +1 @@ +logging.level.de.trion=debug