From a249a48808e326b7d8a847e7429129c526dd9539 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 7 Jun 2020 19:54:25 +0200 Subject: [PATCH] TMP --- README.sh | 2 + validate-user/order-validation.avsc | 2 +- .../validateuser/ApplicationProperties.java | 11 +++ .../validateuser/ValidateUserService.java | 75 ++++++++++++++++++- 4 files changed, 88 insertions(+), 2 deletions(-) diff --git a/README.sh b/README.sh index 46c310c..0e811ac 100755 --- a/README.sh +++ b/README.sh @@ -22,6 +22,8 @@ docker-compose up -d zookeeper kafka schema-registry while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders +kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic users +kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic users_orders kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic validation docker-compose up -d take-order validate-order validate-user validation-results details diff --git a/validate-user/order-validation.avsc b/validate-user/order-validation.avsc index 9db183d..f4ff41b 100644 --- a/validate-user/order-validation.avsc +++ b/validate-user/order-validation.avsc @@ -3,7 +3,7 @@ "namespace": "de.trion.microservices.avro", "type": "enum", "name": "OrderValidationType", - "symbols" : [ "ORDER_DETAILS_CHECK" ] + "symbols" : [ "ORDER_DETAILS_CHECK", "ORDER_USER_CHECK" ] }, { "namespace": "de.trion.microservices.avro", diff --git a/validate-user/src/main/java/de/trion/microservices/validateuser/ApplicationProperties.java b/validate-user/src/main/java/de/trion/microservices/validateuser/ApplicationProperties.java index 376afdb..2fad22d 100644 --- a/validate-user/src/main/java/de/trion/microservices/validateuser/ApplicationProperties.java +++ b/validate-user/src/main/java/de/trion/microservices/validateuser/ApplicationProperties.java @@ -10,6 +10,7 @@ public class ApplicationProperties String bootstrapServers = "kafka:9092"; String schemaRegistryUrl = "http://schema-registry:8081"; String ordersTopic = "orders"; + String usersTopic = "users"; String validationTopic = "validation"; @@ -43,6 +44,16 @@ public class ApplicationProperties this.ordersTopic = topic; } + public String getUsersTopic() + { + return usersTopic; + } + + public void setUsersTopic(String topic) + { + this.usersTopic = topic; + } + public String getValidationTopic() { return validationTopic; diff --git a/validate-user/src/main/java/de/trion/microservices/validateuser/ValidateUserService.java b/validate-user/src/main/java/de/trion/microservices/validateuser/ValidateUserService.java index a1ddd6b..a006fe1 100644 --- a/validate-user/src/main/java/de/trion/microservices/validateuser/ValidateUserService.java +++ b/validate-user/src/main/java/de/trion/microservices/validateuser/ValidateUserService.java @@ -1,12 +1,16 @@ package de.trion.microservices.validateuser; +import de.trion.microservices.avro.CustomerLevel; import de.trion.microservices.avro.Order; +import de.trion.microservices.avro.OrderAndUser; import de.trion.microservices.avro.OrderState; import de.trion.microservices.avro.OrderValidation; +import de.trion.microservices.avro.OrderValidation.Builder; import static de.trion.microservices.avro.OrderValidationResult.FAIL; import static de.trion.microservices.avro.OrderValidationResult.PASS; -import static de.trion.microservices.avro.OrderValidationType.ORDER_DETAILS_CHECK; +import static de.trion.microservices.avro.OrderValidationType.ORDER_USER_CHECK; +import de.trion.microservices.avro.User; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import java.util.LinkedList; import java.util.List; @@ -15,8 +19,15 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.Topology.AutoOffsetReset; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -28,6 +39,7 @@ public class ValidateUserService final static Logger LOG = LoggerFactory.getLogger(ValidateUserService.class); private final String orders; + private final String users; private final String validation; private final KafkaStreams streams; @@ -35,6 +47,7 @@ public class ValidateUserService public ValidateUserService(ApplicationProperties config) { orders = config.ordersTopic; + users = config.usersTopic; validation = config.validationTopic; Properties properties = new Properties(); @@ -46,6 +59,66 @@ public class ValidateUserService StreamsBuilder builder = new StreamsBuilder(); + KTable usersTable = + builder + .table( + users, + Consumed.with(Serdes.Long(), null, null, AutoOffsetReset.EARLIEST), + Materialized.as(users)); + + builder + .stream(orders) + .filter((id, order) -> order.getState() == OrderState.CREATED) + .map((id,order) -> + { + return new KeyValue<>(order.getCustomerId(), id); + }) + .through(users + "_" + orders, Produced.with(Serdes.Long(), Serdes.String())) + .leftJoin( + usersTable, + (order, user) -> + { + return + OrderAndUser + .newBuilder() + .setOrderId(order) + .setUser(user) + .build(); + }, + Joined.keySerde(Serdes.Long())) + .map((id, orderAndUser) -> + { + List messages = new LinkedList<>(); + + Builder validation = + OrderValidation + .newBuilder() + .setOrderId(orderAndUser.getOrderId()) + .setCheckType(ORDER_USER_CHECK) + .setMessages(messages); + + if (orderAndUser.getUser() == null) + return + new KeyValue<>( + orderAndUser.getOrderId().toString(), + validation.setValidationResult(FAIL).build()); + + User user = orderAndUser.getUser(); + + if (user.getLevel() != CustomerLevel.UNWANTED) + return + new KeyValue<>( + orderAndUser.getOrderId().toString(), + validation.setValidationResult(PASS).build()); + + messages.add("User is UNWANTED: " + user.getName()); + return + new KeyValue<>( + orderAndUser.getOrderId().toString(), + validation.setValidationResult(FAIL).build()); + }) + .to(validation); + Topology topology = builder.build(); streams = new KafkaStreams(topology, properties); streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> -- 2.20.1