TMP
[demos/microservices] / validate-user / src / main / java / de / trion / microservices / validateuser / ValidateUserService.java
index a1ddd6b..a006fe1 100644 (file)
@@ -1,12 +1,16 @@
 package de.trion.microservices.validateuser;
 
 
+import de.trion.microservices.avro.CustomerLevel;
 import de.trion.microservices.avro.Order;
+import de.trion.microservices.avro.OrderAndUser;
 import de.trion.microservices.avro.OrderState;
 import de.trion.microservices.avro.OrderValidation;
+import de.trion.microservices.avro.OrderValidation.Builder;
 import static de.trion.microservices.avro.OrderValidationResult.FAIL;
 import static de.trion.microservices.avro.OrderValidationResult.PASS;
-import static de.trion.microservices.avro.OrderValidationType.ORDER_DETAILS_CHECK;
+import static de.trion.microservices.avro.OrderValidationType.ORDER_USER_CHECK;
+import de.trion.microservices.avro.User;
 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
 import java.util.LinkedList;
 import java.util.List;
@@ -15,8 +19,15 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.Topology.AutoOffsetReset;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -28,6 +39,7 @@ public class ValidateUserService
   final static Logger LOG = LoggerFactory.getLogger(ValidateUserService.class);
 
   private final String orders;
+  private final String users;
   private final String validation;
   private final KafkaStreams streams;
 
@@ -35,6 +47,7 @@ public class ValidateUserService
   public ValidateUserService(ApplicationProperties config)
   {
     orders = config.ordersTopic;
+    users = config.usersTopic;
     validation = config.validationTopic;
 
     Properties properties = new Properties();
@@ -46,6 +59,66 @@ public class ValidateUserService
 
     StreamsBuilder builder = new StreamsBuilder();
 
+    KTable<Long, User> usersTable =
+        builder
+            .table(
+                users,
+                Consumed.with(Serdes.Long(), null, null, AutoOffsetReset.EARLIEST),
+                Materialized.as(users));
+
+    builder
+        .<String,Order>stream(orders)
+        .filter((id, order) -> order.getState() == OrderState.CREATED)
+        .map((id,order) ->
+        {
+          return new KeyValue<>(order.getCustomerId(), id);
+        })
+        .through(users + "_" + orders, Produced.with(Serdes.Long(), Serdes.String()))
+        .leftJoin(
+            usersTable,
+            (order, user) ->
+            {
+              return
+                  OrderAndUser
+                      .newBuilder()
+                      .setOrderId(order)
+                      .setUser(user)
+                      .build();
+            },
+            Joined.keySerde(Serdes.Long()))
+        .map((id, orderAndUser) ->
+        {
+          List<CharSequence> messages = new LinkedList<>();
+
+          Builder validation =
+              OrderValidation
+                  .newBuilder()
+                  .setOrderId(orderAndUser.getOrderId())
+                  .setCheckType(ORDER_USER_CHECK)
+                  .setMessages(messages);
+
+          if (orderAndUser.getUser() == null)
+            return
+                new KeyValue<>(
+                    orderAndUser.getOrderId().toString(),
+                    validation.setValidationResult(FAIL).build());
+
+          User user = orderAndUser.getUser();
+
+          if (user.getLevel() != CustomerLevel.UNWANTED)
+            return
+                new KeyValue<>(
+                    orderAndUser.getOrderId().toString(),
+                    validation.setValidationResult(PASS).build());
+
+          messages.add("User is UNWANTED: " + user.getName());
+          return
+              new KeyValue<>(
+                  orderAndUser.getOrderId().toString(),
+                  validation.setValidationResult(FAIL).build());
+        })
+        .to(validation);
+
     Topology topology = builder.build();
     streams = new KafkaStreams(topology, properties);
     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->