take-order/target
details/src/main/java/de/trion/microservices/avro
details/target
+validate-order/src/main/java/de/trion/microservices/avro
+validate-order/target
docker build -t trion/take-order-service:01 take-order
docker build -t trion/details-service:02 details
+docker build -t trion/validate-order-service:03 validate-order
docker-compose up -d zookeeper kafka schema-registry
while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done
kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders
+kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic validation
-docker-compose up -d take-order details
+docker-compose up -d take-order validate-order details
kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic orders &
+kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic validation &
while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for take-order..."; sleep 1; done
while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for details..."; sleep 1; done
+while ! [[ $(http 0:8093/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for validate-order..."; sleep 1; done
http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=5
http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=
http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=-5
<parent>
<groupId>de.trion.kafka.microservices</groupId>
<artifactId>order-example</artifactId>
- <version>02</version>
+ <version>03</version>
</parent>
<groupId>de.trion.kafka.microservices</groupId>
- kafka
- schema-registry
+ validate-order:
+ image: trion/validate-order-service:03
+ hostname: validate-order
+ ports:
+ - "8093:8080"
+ depends_on:
+ - zookeeper
+ - kafka
+ - schema-registry
+
details:
image: trion/details-service:02
hostname: details
<groupId>de.trion.kafka.microservices</groupId>
<artifactId>order-example</artifactId>
<name>Order Example</name>
- <version>02</version>
+ <version>03</version>
<packaging>pom</packaging>
<properties>
<modules>
<module>take-order</module>
<module>details</module>
+ <module>validate-order</module>
</modules>
</project>
<parent>
<groupId>de.trion.kafka.microservices</groupId>
<artifactId>order-example</artifactId>
- <version>02</version>
+ <version>03</version>
</parent>
<groupId>de.trion.kafka.microservices</groupId>
--- /dev/null
+FROM openjdk:8-jre-slim
+COPY target/validate-order-03-SNAPSHOT.jar /opt/
+EXPOSE 8080
+ENTRYPOINT ["java", "-jar", "/opt/validate-order-03-SNAPSHOT.jar"]
+CMD []
--- /dev/null
+[
+{
+ "namespace": "de.trion.microservices.avro",
+ "type": "enum",
+ "name": "OrderValidationType",
+ "symbols" : [ "ORDER_DETAILS_CHECK" ]
+},
+{
+ "namespace": "de.trion.microservices.avro",
+ "type": "enum",
+ "name": "OrderValidationResult",
+ "symbols" : [ "PASS", "FAIL" ]
+},
+{
+ "namespace": "de.trion.microservices.avro",
+ "type": "record",
+ "name": "OrderValidation",
+ "fields": [
+ { "name": "orderId", "type": "string" },
+ { "name": "checkType", "type": "OrderValidationType" },
+ { "name": "validationResult", "type": "OrderValidationResult" },
+ {
+ "name": "messages",
+ "type":
+ {
+ "type": "array",
+ "items": "string"
+ }
+ }
+ ]
+}]
--- /dev/null
+[
+ {
+ "namespace": "de.trion.microservices.avro",
+ "type": "enum",
+ "name": "OrderState",
+ "symbols" : [ "CREATED" ]
+ },
+ {
+ "namespace": "de.trion.microservices.avro",
+ "type": "record",
+ "name": "Order",
+ "fields": [
+ { "name": "id", "type": "string" },
+ { "name": "state", "type": "OrderState" },
+ { "name": "customerId", "type": "long" },
+ { "name": "orderId", "type": "long" },
+ { "name": "productId", "type": "long" },
+ { "name": "quantity", "type": "int" }
+ ]
+ }
+]
--- /dev/null
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>de.trion.kafka.microservices</groupId>
+ <artifactId>order-example</artifactId>
+ <version>03</version>
+ </parent>
+
+ <groupId>de.trion.kafka.microservices</groupId>
+ <artifactId>validate-order</artifactId>
+ <name>Order Validation Service</name>
+ <version>03-SNAPSHOT</version>
+
+ <properties>
+ <avro.version>1.9.0</avro.version>
+ <confluent.version>5.3.0</confluent.version>
+ <kafka.version>2.3.0</kafka.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-streams</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-streams-avro-serde</artifactId>
+ <version>${confluent.version}</version>
+ </dependency>
+ </dependencies>
+
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <url>https://packages.confluent.io/maven/</url>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>${avro.version}</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${project.basedir}/</sourceDirectory>
+ <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.trion.microservices.validateorder;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+public class Application
+{
+ private final static Logger LOG = LoggerFactory.getLogger(Application.class);
+
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(Application.class, args);
+ }
+}
\ No newline at end of file
--- /dev/null
+package de.trion.microservices.validateorder;
+
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("take-order")
+public class ApplicationProperties
+{
+ String bootstrapServers = "kafka:9092";
+ String schemaRegistryUrl = "http://schema-registry:8081";
+ String ordersTopic = "orders";
+ String validationTopic = "validation";
+
+
+ public String getBootstrapServers()
+ {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers)
+ {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public String getSchemaRegistryUrl()
+ {
+ return schemaRegistryUrl;
+ }
+
+ public void setSchemaRegistryUrl(String schemaRegistryUrl)
+ {
+ this.schemaRegistryUrl = schemaRegistryUrl;
+ }
+
+ public String getOrdersTopic()
+ {
+ return ordersTopic;
+ }
+
+ public void setOrdersTopic(String topic)
+ {
+ this.ordersTopic = topic;
+ }
+
+ public String getValidationTopic()
+ {
+ return validationTopic;
+ }
+
+ public void setValidationTopic(String topic)
+ {
+ this.validationTopic = topic;
+ }
+}
--- /dev/null
+package de.trion.microservices.validateorder;
+
+
+import de.trion.microservices.avro.Order;
+import de.trion.microservices.avro.OrderState;
+import de.trion.microservices.avro.OrderValidation;
+import static de.trion.microservices.avro.OrderValidationResult.FAIL;
+import static de.trion.microservices.avro.OrderValidationResult.PASS;
+import static de.trion.microservices.avro.OrderValidationType.ORDER_DETAILS_CHECK;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class ValidateOrderService
+{
+ final static Logger LOG = LoggerFactory.getLogger(ValidateOrderService.class);
+
+ private final String orders;
+ private final String validation;
+ private final KafkaStreams streams;
+
+
+ public ValidateOrderService(ApplicationProperties config)
+ {
+ orders = config.ordersTopic;
+ validation = config.validationTopic;
+
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", config.bootstrapServers);
+ properties.put("application.id", "validate-order");
+ properties.put("schema.registry.url", config.schemaRegistryUrl);
+ properties.put("default.key.serde", Serdes.String().getClass());
+ properties.put("default.value.serde", SpecificAvroSerde.class);
+
+ StreamsBuilder builder = new StreamsBuilder();
+ builder
+ .<String,Order>stream(orders)
+ .filter((id, order) -> order.getState() == OrderState.CREATED)
+ .mapValues((order) ->
+ {
+ List<CharSequence> messages = new LinkedList<>();
+
+ if (order.getCustomerId() < 1)
+ messages.add("Customer-ID must be greater than 0");
+ if (order.getProductId() < 1)
+ messages.add("Product-ID must be greater than 0");
+ if (order.getQuantity() < 1)
+ messages.add("The ordered quantity must be greater than 0");
+
+ return
+ OrderValidation
+ .newBuilder()
+ .setOrderId(order.getId())
+ .setCheckType(ORDER_DETAILS_CHECK)
+ .setValidationResult(messages.isEmpty() ? PASS : FAIL)
+ .setMessages(messages)
+ .build();
+ })
+ .to(validation);
+
+ Topology topology = builder.build();
+ streams = new KafkaStreams(topology, properties);
+ streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+ {
+ LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+ try
+ {
+ streams.close();
+ }
+ catch (Exception ex)
+ {
+ LOG.error("Could not close KafkaStreams!", ex);
+ }
+ });
+ }
+
+
+ @PostConstruct
+ public void start()
+ {
+ streams.start();
+ }
+
+ @PreDestroy
+ public void stop()
+ {
+ streams.close();
+ }
+}
--- /dev/null
+logging.level.de.trion=debug