X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fmicroservices;a=blobdiff_plain;f=validate-user%2Fsrc%2Fmain%2Fjava%2Fde%2Ftrion%2Fmicroservices%2Fvalidateuser%2FValidateUserService.java;fp=validate-user%2Fsrc%2Fmain%2Fjava%2Fde%2Ftrion%2Fmicroservices%2Fvalidateuser%2FValidateUserService.java;h=a006fe1e94268e3f7e119bb6e914007ec8fcc6fb;hp=a1ddd6bb86e670a4ae61fa2dde9eb4b3c33be722;hb=a249a48808e326b7d8a847e7429129c526dd9539;hpb=0e92e2fed82ca65a5650a4c52c015e80ccb64618 diff --git a/validate-user/src/main/java/de/trion/microservices/validateuser/ValidateUserService.java b/validate-user/src/main/java/de/trion/microservices/validateuser/ValidateUserService.java index a1ddd6b..a006fe1 100644 --- a/validate-user/src/main/java/de/trion/microservices/validateuser/ValidateUserService.java +++ b/validate-user/src/main/java/de/trion/microservices/validateuser/ValidateUserService.java @@ -1,12 +1,16 @@ package de.trion.microservices.validateuser; +import de.trion.microservices.avro.CustomerLevel; import de.trion.microservices.avro.Order; +import de.trion.microservices.avro.OrderAndUser; import de.trion.microservices.avro.OrderState; import de.trion.microservices.avro.OrderValidation; +import de.trion.microservices.avro.OrderValidation.Builder; import static de.trion.microservices.avro.OrderValidationResult.FAIL; import static de.trion.microservices.avro.OrderValidationResult.PASS; -import static de.trion.microservices.avro.OrderValidationType.ORDER_DETAILS_CHECK; +import static de.trion.microservices.avro.OrderValidationType.ORDER_USER_CHECK; +import de.trion.microservices.avro.User; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import java.util.LinkedList; import java.util.List; @@ -15,8 +19,15 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.Topology.AutoOffsetReset; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -28,6 +39,7 @@ public class ValidateUserService final static Logger LOG = LoggerFactory.getLogger(ValidateUserService.class); private final String orders; + private final String users; private final String validation; private final KafkaStreams streams; @@ -35,6 +47,7 @@ public class ValidateUserService public ValidateUserService(ApplicationProperties config) { orders = config.ordersTopic; + users = config.usersTopic; validation = config.validationTopic; Properties properties = new Properties(); @@ -46,6 +59,66 @@ public class ValidateUserService StreamsBuilder builder = new StreamsBuilder(); + KTable usersTable = + builder + .table( + users, + Consumed.with(Serdes.Long(), null, null, AutoOffsetReset.EARLIEST), + Materialized.as(users)); + + builder + .stream(orders) + .filter((id, order) -> order.getState() == OrderState.CREATED) + .map((id,order) -> + { + return new KeyValue<>(order.getCustomerId(), id); + }) + .through(users + "_" + orders, Produced.with(Serdes.Long(), Serdes.String())) + .leftJoin( + usersTable, + (order, user) -> + { + return + OrderAndUser + .newBuilder() + .setOrderId(order) + .setUser(user) + .build(); + }, + Joined.keySerde(Serdes.Long())) + .map((id, orderAndUser) -> + { + List messages = new LinkedList<>(); + + Builder validation = + OrderValidation + .newBuilder() + .setOrderId(orderAndUser.getOrderId()) + .setCheckType(ORDER_USER_CHECK) + .setMessages(messages); + + if (orderAndUser.getUser() == null) + return + new KeyValue<>( + orderAndUser.getOrderId().toString(), + validation.setValidationResult(FAIL).build()); + + User user = orderAndUser.getUser(); + + if (user.getLevel() != CustomerLevel.UNWANTED) + return + new KeyValue<>( + orderAndUser.getOrderId().toString(), + validation.setValidationResult(PASS).build()); + + messages.add("User is UNWANTED: " + user.getName()); + return + new KeyValue<>( + orderAndUser.getOrderId().toString(), + validation.setValidationResult(FAIL).build()); + }) + .to(validation); + Topology topology = builder.build(); streams = new KafkaStreams(topology, properties); streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->