TMP
authorKai Moritz <kai@juplo.de>
Sun, 7 Jun 2020 17:54:25 +0000 (19:54 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 7 Jun 2020 17:54:25 +0000 (19:54 +0200)
README.sh
validate-user/order-validation.avsc
validate-user/src/main/java/de/trion/microservices/validateuser/ApplicationProperties.java
validate-user/src/main/java/de/trion/microservices/validateuser/ValidateUserService.java

index 46c310c..0e811ac 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -22,6 +22,8 @@ docker-compose up -d zookeeper kafka schema-registry
 while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done
 
 kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders
+kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic users
+kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic users_orders
 kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic validation
 
 docker-compose up -d take-order validate-order validate-user validation-results details
index 9db183d..f4ff41b 100644 (file)
@@ -3,7 +3,7 @@
   "namespace": "de.trion.microservices.avro",
   "type": "enum",
   "name": "OrderValidationType",
-  "symbols" : [ "ORDER_DETAILS_CHECK" ]
+  "symbols" : [ "ORDER_DETAILS_CHECK", "ORDER_USER_CHECK" ]
 },
 {
   "namespace": "de.trion.microservices.avro",
index 376afdb..2fad22d 100644 (file)
@@ -10,6 +10,7 @@ public class ApplicationProperties
   String bootstrapServers = "kafka:9092";
   String schemaRegistryUrl = "http://schema-registry:8081";
   String ordersTopic = "orders";
+  String usersTopic = "users";
   String validationTopic = "validation";
 
 
@@ -43,6 +44,16 @@ public class ApplicationProperties
     this.ordersTopic = topic;
   }
 
+  public String getUsersTopic()
+  {
+    return usersTopic;
+  }
+
+  public void setUsersTopic(String topic)
+  {
+    this.usersTopic = topic;
+  }
+
   public String getValidationTopic()
   {
     return validationTopic;
index a1ddd6b..a006fe1 100644 (file)
@@ -1,12 +1,16 @@
 package de.trion.microservices.validateuser;
 
 
+import de.trion.microservices.avro.CustomerLevel;
 import de.trion.microservices.avro.Order;
+import de.trion.microservices.avro.OrderAndUser;
 import de.trion.microservices.avro.OrderState;
 import de.trion.microservices.avro.OrderValidation;
+import de.trion.microservices.avro.OrderValidation.Builder;
 import static de.trion.microservices.avro.OrderValidationResult.FAIL;
 import static de.trion.microservices.avro.OrderValidationResult.PASS;
-import static de.trion.microservices.avro.OrderValidationType.ORDER_DETAILS_CHECK;
+import static de.trion.microservices.avro.OrderValidationType.ORDER_USER_CHECK;
+import de.trion.microservices.avro.User;
 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
 import java.util.LinkedList;
 import java.util.List;
@@ -15,8 +19,15 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.Topology.AutoOffsetReset;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -28,6 +39,7 @@ public class ValidateUserService
   final static Logger LOG = LoggerFactory.getLogger(ValidateUserService.class);
 
   private final String orders;
+  private final String users;
   private final String validation;
   private final KafkaStreams streams;
 
@@ -35,6 +47,7 @@ public class ValidateUserService
   public ValidateUserService(ApplicationProperties config)
   {
     orders = config.ordersTopic;
+    users = config.usersTopic;
     validation = config.validationTopic;
 
     Properties properties = new Properties();
@@ -46,6 +59,66 @@ public class ValidateUserService
 
     StreamsBuilder builder = new StreamsBuilder();
 
+    KTable<Long, User> usersTable =
+        builder
+            .table(
+                users,
+                Consumed.with(Serdes.Long(), null, null, AutoOffsetReset.EARLIEST),
+                Materialized.as(users));
+
+    builder
+        .<String,Order>stream(orders)
+        .filter((id, order) -> order.getState() == OrderState.CREATED)
+        .map((id,order) ->
+        {
+          return new KeyValue<>(order.getCustomerId(), id);
+        })
+        .through(users + "_" + orders, Produced.with(Serdes.Long(), Serdes.String()))
+        .leftJoin(
+            usersTable,
+            (order, user) ->
+            {
+              return
+                  OrderAndUser
+                      .newBuilder()
+                      .setOrderId(order)
+                      .setUser(user)
+                      .build();
+            },
+            Joined.keySerde(Serdes.Long()))
+        .map((id, orderAndUser) ->
+        {
+          List<CharSequence> messages = new LinkedList<>();
+
+          Builder validation =
+              OrderValidation
+                  .newBuilder()
+                  .setOrderId(orderAndUser.getOrderId())
+                  .setCheckType(ORDER_USER_CHECK)
+                  .setMessages(messages);
+
+          if (orderAndUser.getUser() == null)
+            return
+                new KeyValue<>(
+                    orderAndUser.getOrderId().toString(),
+                    validation.setValidationResult(FAIL).build());
+
+          User user = orderAndUser.getUser();
+
+          if (user.getLevel() != CustomerLevel.UNWANTED)
+            return
+                new KeyValue<>(
+                    orderAndUser.getOrderId().toString(),
+                    validation.setValidationResult(PASS).build());
+
+          messages.add("User is UNWANTED: " + user.getName());
+          return
+              new KeyValue<>(
+                  orderAndUser.getOrderId().toString(),
+                  validation.setValidationResult(FAIL).build());
+        })
+        .to(validation);
+
     Topology topology = builder.build();
     streams = new KafkaStreams(topology, properties);
     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->