streams - Übungen - Microservices - Schritt 01
authorKai Moritz <kai@juplo.de>
Sun, 7 Jun 2020 08:42:39 +0000 (10:42 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 7 Jun 2020 09:47:18 +0000 (11:47 +0200)
--
Microservice Take-Order implementiert

* Microservice implementiert, der neue Orders annimmt
* Orders werden asynchron angenommen
* HTTP-Antwort erfolgt erst, wenn Order erfolgreich in Topic geschrieben
* Für jede Anfrage wird eine UUID generiert, die als Schlüssel fungiert
* Bei Erfolg wird eine URI zurückgegeben, unter der die Order abfragbar ist

13 files changed:
.dockerignore [new file with mode: 0644]
.gitignore [new file with mode: 0644]
README.sh [new file with mode: 0755]
docker-compose.yml [new file with mode: 0644]
pom.xml [new file with mode: 0644]
take-order/Dockerfile [new file with mode: 0644]
take-order/order.avsc [new file with mode: 0644]
take-order/pom.xml [new file with mode: 0644]
take-order/src/main/java/de/trion/microservices/takeorder/Application.java [new file with mode: 0644]
take-order/src/main/java/de/trion/microservices/takeorder/ApplicationProperties.java [new file with mode: 0644]
take-order/src/main/java/de/trion/microservices/takeorder/OrderBean.java [new file with mode: 0644]
take-order/src/main/java/de/trion/microservices/takeorder/TakeOrderService.java [new file with mode: 0644]
take-order/src/main/resources/application.properties [new file with mode: 0644]

diff --git a/.dockerignore b/.dockerignore
new file mode 100644 (file)
index 0000000..1ad9963
--- /dev/null
@@ -0,0 +1,2 @@
+*
+!target/*.jar
diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..4d57b56
--- /dev/null
@@ -0,0 +1,2 @@
+take-order/src/main/java/de/trion/microservices/avro
+take-order/target
diff --git a/README.sh b/README.sh
new file mode 100755 (executable)
index 0000000..2b528ce
--- /dev/null
+++ b/README.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+
+if [ "$1" = "cleanup" ]
+then
+  docker-compose down
+  mvn clean
+  exit
+fi
+
+mvn package
+
+docker build -t trion/take-order-service:0 take-order
+
+docker-compose up -d zookeeper kafka schema-registry
+
+while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done
+
+kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders
+
+docker-compose up -d take-order
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644 (file)
index 0000000..c9ef2c4
--- /dev/null
@@ -0,0 +1,48 @@
+version: '3.2'
+services:
+  zookeeper:
+    image: confluentinc/cp-zookeeper:5.3.0
+    ports:
+      - "2181:2181"
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+
+  kafka:
+    image: confluentinc/cp-kafka:5.3.0
+    ports:
+      - "9092:9092"
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
+      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+    depends_on:
+      - zookeeper
+
+  schema-registry:
+    image: confluentinc/cp-schema-registry:5.3.0
+    hostname: schema-registry
+    ports:
+      - "8081:8081"
+    environment:
+      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
+      SCHEMA_REGISTRY_HOST_NAME: schema-registry
+      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
+    depends_on:
+      - zookeeper
+      - kafka
+
+  take-order:
+    image: trion/take-order-service:0
+    hostname: take-order
+    ports:
+      - "8091:8080"
+    depends_on:
+      - zookeeper
+      - kafka
+      - schema-registry
+
+networks:
+  default:
+    external:
+      name: trion
diff --git a/pom.xml b/pom.xml
new file mode 100644 (file)
index 0000000..1753f10
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,27 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.springframework.boot</groupId>
+    <artifactId>spring-boot-starter-parent</artifactId>
+    <version>2.1.6.RELEASE</version>
+  </parent>
+
+  <groupId>de.trion.kafka.microservices</groupId>
+  <artifactId>order-example</artifactId>
+  <name>Order Example</name>
+  <version>0-SNAPSHOT</version>
+  <packaging>pom</packaging>
+
+  <properties>
+    <avro.version>1.9.0</avro.version>
+    <confluent.version>5.3.0</confluent.version>
+    <kafka.version>2.3.0</kafka.version>
+  </properties>
+
+  <modules>
+    <module>take-order</module>
+  </modules>
+
+</project>
diff --git a/take-order/Dockerfile b/take-order/Dockerfile
new file mode 100644 (file)
index 0000000..6f51b96
--- /dev/null
@@ -0,0 +1,4 @@
+FROM openjdk:8-jre-slim
+COPY target/take-order-0-SNAPSHOT.jar /opt/
+EXPOSE 8080
+CMD ["java", "-jar", "/opt/take-order-0-SNAPSHOT.jar"]
diff --git a/take-order/order.avsc b/take-order/order.avsc
new file mode 100644 (file)
index 0000000..6879085
--- /dev/null
@@ -0,0 +1,21 @@
+[
+  {
+    "namespace": "de.trion.microservices.avro",
+    "type": "enum",
+    "name": "OrderState",
+    "symbols" : ["CREATED", "VALIDATED", "INVALID"]
+  },
+  {
+    "namespace": "de.trion.microservices.avro",
+    "type": "record",
+    "name": "Order",
+    "fields": [
+      { "name": "id", "type": "string" },
+      { "name": "state", "type": "OrderState" },
+      { "name": "customerId", "type": "long" },
+      { "name": "orderId", "type": "long" },
+      { "name": "productId",  "type": "long" },
+      { "name": "quantity", "type": "int" }
+    ]
+  }
+]
diff --git a/take-order/pom.xml b/take-order/pom.xml
new file mode 100644 (file)
index 0000000..016d3c6
--- /dev/null
@@ -0,0 +1,80 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>de.trion.kafka.microservices</groupId>
+    <artifactId>order-example</artifactId>
+    <version>0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>de.trion.kafka.microservices</groupId>
+  <artifactId>take-order</artifactId>
+  <name>Take Order Service</name>
+  <version>0-SNAPSHOT</version>
+
+  <properties>
+    <avro.version>1.9.0</avro.version>
+    <confluent.version>5.3.0</confluent.version>
+    <kafka.version>2.3.0</kafka.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-actuator</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-web</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-streams</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>${avro.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-streams-avro-serde</artifactId>
+      <version>${confluent.version}</version>
+    </dependency>
+  </dependencies>
+
+  <repositories>
+    <repository>
+      <id>confluent</id>
+      <url>https://packages.confluent.io/maven/</url>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>${avro.version}</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <sourceDirectory>${project.basedir}/</sourceDirectory>
+              <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/take-order/src/main/java/de/trion/microservices/takeorder/Application.java b/take-order/src/main/java/de/trion/microservices/takeorder/Application.java
new file mode 100644 (file)
index 0000000..2e34b85
--- /dev/null
@@ -0,0 +1,41 @@
+package de.trion.microservices.takeorder;
+
+
+import de.trion.microservices.avro.Order;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.UUIDSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+public class Application
+{
+  private final static Logger LOG = LoggerFactory.getLogger(Application.class);
+
+
+  @Bean(destroyMethod = "close")
+  KafkaProducer<UUID,Order> producer(ApplicationProperties properties)
+  {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", properties.bootstrapServers);
+    props.put("schema.registry.url", properties.schemaRegistryUrl);
+    props.put("key.serializer", UUIDSerializer.class.getName());
+    props.put("value.serializer", SpecificAvroSerializer.class.getName());
+
+    return new KafkaProducer<>(props);
+  }
+
+  public static void main(String[] args)
+  {
+    SpringApplication.run(Application.class, args);
+  }
+}
\ No newline at end of file
diff --git a/take-order/src/main/java/de/trion/microservices/takeorder/ApplicationProperties.java b/take-order/src/main/java/de/trion/microservices/takeorder/ApplicationProperties.java
new file mode 100644 (file)
index 0000000..a407db6
--- /dev/null
@@ -0,0 +1,55 @@
+package de.trion.microservices.takeorder;
+
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("take-order")
+public class ApplicationProperties
+{
+  String bootstrapServers = "kafka:9092";
+  String schemaRegistryUrl = "http://schema-registry:8081";
+  String topic = "orders";
+  String path = "http://details:8092/orders/";
+
+
+  public String getBootstrapServers()
+  {
+    return bootstrapServers;
+  }
+
+  public void setBootstrapServers(String bootstrapServers)
+  {
+    this.bootstrapServers = bootstrapServers;
+  }
+
+  public String getSchemaRegistryUrl()
+  {
+    return schemaRegistryUrl;
+  }
+
+  public void setSchemaRegistryUrl(String schemaRegistryUrl)
+  {
+    this.schemaRegistryUrl = schemaRegistryUrl;
+  }
+
+  public String getTopic()
+  {
+    return topic;
+  }
+
+  public void setTopic(String topic)
+  {
+    this.topic = topic;
+  }
+
+  public String getPath()
+  {
+    return path;
+  }
+
+  public void setPath(String path)
+  {
+    this.path = path;
+  }
+}
diff --git a/take-order/src/main/java/de/trion/microservices/takeorder/OrderBean.java b/take-order/src/main/java/de/trion/microservices/takeorder/OrderBean.java
new file mode 100644 (file)
index 0000000..b5343aa
--- /dev/null
@@ -0,0 +1,89 @@
+package de.trion.microservices.takeorder;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+
+/**
+ * Simple DTO used by the REST interface
+ */
+public class OrderBean
+{
+  @NotNull(message = "Cannot be null")
+  @Min(value = 1, message = "Must be a positive number")
+  private long id;
+  @NotNull(message = "Cannot be null")
+  @Min(value = 1, message = "Must be a positive number")
+  private long customerId;
+  @NotNull(message = "Cannot be null")
+  @Min(value = 1, message = "Must be a positive number")
+  private long productId;
+  @NotNull(message = "Cannot be null")
+  @Min(value = 1, message = "Must be a positive number")
+  private int quantity;
+
+
+  public OrderBean() {}
+
+  public OrderBean(
+      final long id,
+      final long customerId,
+      final long productId,
+      final int quantity)
+  {
+    this.id = id;
+    this.customerId = customerId;
+    this.productId = productId;
+    this.quantity = quantity;
+  }
+
+  public long getId()
+  {
+    return id;
+  }
+
+  public long getCustomerId()
+  {
+    return customerId;
+  }
+
+  public long getProductId()
+  {
+    return productId;
+  }
+
+  public int getQuantity()
+  {
+    return quantity;
+  }
+
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o)
+      return true;
+    if (o == null || this.getClass() != o.getClass())
+      return false;
+
+    final OrderBean orderBean = (OrderBean) o;
+
+    return this.customerId == orderBean.customerId;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "{" +
+        "id=" + id +
+        ", customerId=" + customerId +
+        ", productId=" + productId +
+        ", quantity=" + quantity +
+        '}';
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Long.hashCode(this.id);
+  }
+}
\ No newline at end of file
diff --git a/take-order/src/main/java/de/trion/microservices/takeorder/TakeOrderService.java b/take-order/src/main/java/de/trion/microservices/takeorder/TakeOrderService.java
new file mode 100644 (file)
index 0000000..cc62a43
--- /dev/null
@@ -0,0 +1,86 @@
+package de.trion.microservices.takeorder;
+
+
+import de.trion.microservices.avro.Order;
+import de.trion.microservices.avro.OrderState;
+import java.net.URI;
+import java.util.UUID;
+import javax.validation.Valid;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.context.request.async.DeferredResult;
+
+
+@RestController
+public class TakeOrderService
+{
+  private final static Logger LOG = LoggerFactory.getLogger(TakeOrderService.class);
+
+  private final KafkaProducer<UUID,Order> producer;
+  private final String topic;
+  private final String path;
+
+
+  TakeOrderService(
+      final KafkaProducer<UUID,Order> producer,
+      final ApplicationProperties properties)
+  {
+    this.producer = producer;
+    this.topic = properties.getTopic();
+    this.path = properties.getPath();
+  }
+
+
+  @PostMapping(
+      path = "/orders",
+      consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,
+      produces = MediaType.TEXT_PLAIN_VALUE)
+  public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody OrderBean order)
+  {
+    DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
+
+    try
+    {
+      UUID uuid = UUID.randomUUID();
+      ProducerRecord<UUID, Order> record =
+          new ProducerRecord<>(
+              topic, 
+              uuid,
+              Order
+                  .newBuilder()
+                  .setId(uuid.toString())
+                  .setState(OrderState.CREATED)
+                  .setCustomerId(order.getCustomerId())
+                  .setOrderId(order.getId())
+                  .setProductId(order.getProductId())
+                  .setQuantity(order.getQuantity())
+                  .build());
+
+      producer.send(record, (metadata, exception) ->
+      {
+        if (exception != null)
+        {
+          LOG.error("Could not place order {}: {}", order, exception.toString());
+          result.setErrorResult(exception);
+          return;
+        }
+
+        result.setResult(ResponseEntity.created(URI.create(path + uuid)).build());
+      });
+    }
+    catch (Exception e)
+    {
+      LOG.error("Unexpected exception!", e);
+      result.setErrorResult(e);
+    }
+
+    return result;
+  }
+}
\ No newline at end of file
diff --git a/take-order/src/main/resources/application.properties b/take-order/src/main/resources/application.properties
new file mode 100644 (file)
index 0000000..f22f985
--- /dev/null
@@ -0,0 +1,3 @@
+management.endpoints.web.exposure.include=*
+
+logging.level.de.trion=info