From 8215014571f11cb3846340a222c99014defcd274 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 7 Jun 2020 10:42:39 +0200 Subject: [PATCH] =?utf8?q?streams=20-=20=C3=9Cbungen=20-=20Microservices?= =?utf8?q?=20-=20Schritt=2001=20--=20Microservice=20Take-Order=20implement?= =?utf8?q?iert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Microservice implementiert, der neue Orders annimmt * Orders werden asynchron angenommen * HTTP-Antwort erfolgt erst, wenn Order erfolgreich in Topic geschrieben * Für jede Anfrage wird eine UUID generiert, die als Schlüssel fungiert * Bei Erfolg wird eine URI zurückgegeben, unter der die Order abfragbar ist --- .dockerignore | 2 + .gitignore | 2 + README.sh | 20 +++++ docker-compose.yml | 48 ++++++++++ pom.xml | 27 ++++++ take-order/Dockerfile | 4 + take-order/order.avsc | 21 +++++ take-order/pom.xml | 80 +++++++++++++++++ .../microservices/takeorder/Application.java | 41 +++++++++ .../takeorder/ApplicationProperties.java | 55 ++++++++++++ .../microservices/takeorder/OrderBean.java | 89 +++++++++++++++++++ .../takeorder/TakeOrderService.java | 86 ++++++++++++++++++ .../src/main/resources/application.properties | 3 + 13 files changed, 478 insertions(+) create mode 100644 .dockerignore create mode 100644 .gitignore create mode 100755 README.sh create mode 100644 docker-compose.yml create mode 100644 pom.xml create mode 100644 take-order/Dockerfile create mode 100644 take-order/order.avsc create mode 100644 take-order/pom.xml create mode 100644 take-order/src/main/java/de/trion/microservices/takeorder/Application.java create mode 100644 take-order/src/main/java/de/trion/microservices/takeorder/ApplicationProperties.java create mode 100644 take-order/src/main/java/de/trion/microservices/takeorder/OrderBean.java create mode 100644 take-order/src/main/java/de/trion/microservices/takeorder/TakeOrderService.java create mode 100644 take-order/src/main/resources/application.properties diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4d57b56 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +take-order/src/main/java/de/trion/microservices/avro +take-order/target diff --git a/README.sh b/README.sh new file mode 100755 index 0000000..2b528ce --- /dev/null +++ b/README.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +if [ "$1" = "cleanup" ] +then + docker-compose down + mvn clean + exit +fi + +mvn package + +docker build -t trion/take-order-service:0 take-order + +docker-compose up -d zookeeper kafka schema-registry + +while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done + +kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders + +docker-compose up -d take-order diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..c9ef2c4 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,48 @@ +version: '3.2' +services: + zookeeper: + image: confluentinc/cp-zookeeper:5.3.0 + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: confluentinc/cp-kafka:5.3.0 + ports: + - "9092:9092" + environment: + KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + depends_on: + - zookeeper + + schema-registry: + image: confluentinc/cp-schema-registry:5.3.0 + hostname: schema-registry + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092 + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + depends_on: + - zookeeper + - kafka + + take-order: + image: trion/take-order-service:0 + hostname: take-order + ports: + - "8091:8080" + depends_on: + - zookeeper + - kafka + - schema-registry + +networks: + default: + external: + name: trion diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..1753f10 --- /dev/null +++ b/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.1.6.RELEASE + + + de.trion.kafka.microservices + order-example + Order Example + 0-SNAPSHOT + pom + + + 1.9.0 + 5.3.0 + 2.3.0 + + + + take-order + + + diff --git a/take-order/Dockerfile b/take-order/Dockerfile new file mode 100644 index 0000000..6f51b96 --- /dev/null +++ b/take-order/Dockerfile @@ -0,0 +1,4 @@ +FROM openjdk:8-jre-slim +COPY target/take-order-0-SNAPSHOT.jar /opt/ +EXPOSE 8080 +CMD ["java", "-jar", "/opt/take-order-0-SNAPSHOT.jar"] diff --git a/take-order/order.avsc b/take-order/order.avsc new file mode 100644 index 0000000..6879085 --- /dev/null +++ b/take-order/order.avsc @@ -0,0 +1,21 @@ +[ + { + "namespace": "de.trion.microservices.avro", + "type": "enum", + "name": "OrderState", + "symbols" : ["CREATED", "VALIDATED", "INVALID"] + }, + { + "namespace": "de.trion.microservices.avro", + "type": "record", + "name": "Order", + "fields": [ + { "name": "id", "type": "string" }, + { "name": "state", "type": "OrderState" }, + { "name": "customerId", "type": "long" }, + { "name": "orderId", "type": "long" }, + { "name": "productId", "type": "long" }, + { "name": "quantity", "type": "int" } + ] + } +] diff --git a/take-order/pom.xml b/take-order/pom.xml new file mode 100644 index 0000000..016d3c6 --- /dev/null +++ b/take-order/pom.xml @@ -0,0 +1,80 @@ + + + 4.0.0 + + + de.trion.kafka.microservices + order-example + 0-SNAPSHOT + + + de.trion.kafka.microservices + take-order + Take Order Service + 0-SNAPSHOT + + + 1.9.0 + 5.3.0 + 2.3.0 + + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-web + + + org.apache.kafka + kafka-streams + + + org.apache.avro + avro + ${avro.version} + + + io.confluent + kafka-streams-avro-serde + ${confluent.version} + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/ + ${project.basedir}/src/main/java/ + + + + + + + + diff --git a/take-order/src/main/java/de/trion/microservices/takeorder/Application.java b/take-order/src/main/java/de/trion/microservices/takeorder/Application.java new file mode 100644 index 0000000..2e34b85 --- /dev/null +++ b/take-order/src/main/java/de/trion/microservices/takeorder/Application.java @@ -0,0 +1,41 @@ +package de.trion.microservices.takeorder; + + +import de.trion.microservices.avro.Order; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; +import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.UUIDSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application +{ + private final static Logger LOG = LoggerFactory.getLogger(Application.class); + + + @Bean(destroyMethod = "close") + KafkaProducer producer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.bootstrapServers); + props.put("schema.registry.url", properties.schemaRegistryUrl); + props.put("key.serializer", UUIDSerializer.class.getName()); + props.put("value.serializer", SpecificAvroSerializer.class.getName()); + + return new KafkaProducer<>(props); + } + + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} \ No newline at end of file diff --git a/take-order/src/main/java/de/trion/microservices/takeorder/ApplicationProperties.java b/take-order/src/main/java/de/trion/microservices/takeorder/ApplicationProperties.java new file mode 100644 index 0000000..a407db6 --- /dev/null +++ b/take-order/src/main/java/de/trion/microservices/takeorder/ApplicationProperties.java @@ -0,0 +1,55 @@ +package de.trion.microservices.takeorder; + + +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("take-order") +public class ApplicationProperties +{ + String bootstrapServers = "kafka:9092"; + String schemaRegistryUrl = "http://schema-registry:8081"; + String topic = "orders"; + String path = "http://details:8092/orders/"; + + + public String getBootstrapServers() + { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) + { + this.bootstrapServers = bootstrapServers; + } + + public String getSchemaRegistryUrl() + { + return schemaRegistryUrl; + } + + public void setSchemaRegistryUrl(String schemaRegistryUrl) + { + this.schemaRegistryUrl = schemaRegistryUrl; + } + + public String getTopic() + { + return topic; + } + + public void setTopic(String topic) + { + this.topic = topic; + } + + public String getPath() + { + return path; + } + + public void setPath(String path) + { + this.path = path; + } +} diff --git a/take-order/src/main/java/de/trion/microservices/takeorder/OrderBean.java b/take-order/src/main/java/de/trion/microservices/takeorder/OrderBean.java new file mode 100644 index 0000000..b5343aa --- /dev/null +++ b/take-order/src/main/java/de/trion/microservices/takeorder/OrderBean.java @@ -0,0 +1,89 @@ +package de.trion.microservices.takeorder; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + + +/** + * Simple DTO used by the REST interface + */ +public class OrderBean +{ + @NotNull(message = "Cannot be null") + @Min(value = 1, message = "Must be a positive number") + private long id; + @NotNull(message = "Cannot be null") + @Min(value = 1, message = "Must be a positive number") + private long customerId; + @NotNull(message = "Cannot be null") + @Min(value = 1, message = "Must be a positive number") + private long productId; + @NotNull(message = "Cannot be null") + @Min(value = 1, message = "Must be a positive number") + private int quantity; + + + public OrderBean() {} + + public OrderBean( + final long id, + final long customerId, + final long productId, + final int quantity) + { + this.id = id; + this.customerId = customerId; + this.productId = productId; + this.quantity = quantity; + } + + public long getId() + { + return id; + } + + public long getCustomerId() + { + return customerId; + } + + public long getProductId() + { + return productId; + } + + public int getQuantity() + { + return quantity; + } + + + @Override + public boolean equals(final Object o) { + if (this == o) + return true; + if (o == null || this.getClass() != o.getClass()) + return false; + + final OrderBean orderBean = (OrderBean) o; + + return this.customerId == orderBean.customerId; + } + + @Override + public String toString() + { + return "{" + + "id=" + id + + ", customerId=" + customerId + + ", productId=" + productId + + ", quantity=" + quantity + + '}'; + } + + @Override + public int hashCode() + { + return Long.hashCode(this.id); + } +} \ No newline at end of file diff --git a/take-order/src/main/java/de/trion/microservices/takeorder/TakeOrderService.java b/take-order/src/main/java/de/trion/microservices/takeorder/TakeOrderService.java new file mode 100644 index 0000000..cc62a43 --- /dev/null +++ b/take-order/src/main/java/de/trion/microservices/takeorder/TakeOrderService.java @@ -0,0 +1,86 @@ +package de.trion.microservices.takeorder; + + +import de.trion.microservices.avro.Order; +import de.trion.microservices.avro.OrderState; +import java.net.URI; +import java.util.UUID; +import javax.validation.Valid; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; + + +@RestController +public class TakeOrderService +{ + private final static Logger LOG = LoggerFactory.getLogger(TakeOrderService.class); + + private final KafkaProducer producer; + private final String topic; + private final String path; + + + TakeOrderService( + final KafkaProducer producer, + final ApplicationProperties properties) + { + this.producer = producer; + this.topic = properties.getTopic(); + this.path = properties.getPath(); + } + + + @PostMapping( + path = "/orders", + consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, + produces = MediaType.TEXT_PLAIN_VALUE) + public DeferredResult> placeOrder(@Valid @RequestBody OrderBean order) + { + DeferredResult> result = new DeferredResult<>(); + + try + { + UUID uuid = UUID.randomUUID(); + ProducerRecord record = + new ProducerRecord<>( + topic, + uuid, + Order + .newBuilder() + .setId(uuid.toString()) + .setState(OrderState.CREATED) + .setCustomerId(order.getCustomerId()) + .setOrderId(order.getId()) + .setProductId(order.getProductId()) + .setQuantity(order.getQuantity()) + .build()); + + producer.send(record, (metadata, exception) -> + { + if (exception != null) + { + LOG.error("Could not place order {}: {}", order, exception.toString()); + result.setErrorResult(exception); + return; + } + + result.setResult(ResponseEntity.created(URI.create(path + uuid)).build()); + }); + } + catch (Exception e) + { + LOG.error("Unexpected exception!", e); + result.setErrorResult(e); + } + + return result; + } +} \ No newline at end of file diff --git a/take-order/src/main/resources/application.properties b/take-order/src/main/resources/application.properties new file mode 100644 index 0000000..f22f985 --- /dev/null +++ b/take-order/src/main/resources/application.properties @@ -0,0 +1,3 @@ +management.endpoints.web.exposure.include=* + +logging.level.de.trion=info -- 2.20.1