package de.trion.microservices.validateuser;
+import de.trion.microservices.avro.CustomerLevel;
import de.trion.microservices.avro.Order;
+import de.trion.microservices.avro.OrderAndUser;
import de.trion.microservices.avro.OrderState;
import de.trion.microservices.avro.OrderValidation;
+import de.trion.microservices.avro.OrderValidation.Builder;
import static de.trion.microservices.avro.OrderValidationResult.FAIL;
import static de.trion.microservices.avro.OrderValidationResult.PASS;
-import static de.trion.microservices.avro.OrderValidationType.ORDER_DETAILS_CHECK;
+import static de.trion.microservices.avro.OrderValidationType.ORDER_USER_CHECK;
+import de.trion.microservices.avro.User;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.LinkedList;
import java.util.List;
import javax.annotation.PreDestroy;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.Topology.AutoOffsetReset;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
final static Logger LOG = LoggerFactory.getLogger(ValidateUserService.class);
private final String orders;
+ private final String users;
private final String validation;
private final KafkaStreams streams;
public ValidateUserService(ApplicationProperties config)
{
orders = config.ordersTopic;
+ users = config.usersTopic;
validation = config.validationTopic;
Properties properties = new Properties();
StreamsBuilder builder = new StreamsBuilder();
+ KTable<Long, User> usersTable =
+ builder
+ .table(
+ users,
+ Consumed.with(Serdes.Long(), null, null, AutoOffsetReset.EARLIEST),
+ Materialized.as(users));
+
+ builder
+ .<String,Order>stream(orders)
+ .filter((id, order) -> order.getState() == OrderState.CREATED)
+ .map((id,order) ->
+ {
+ return new KeyValue<>(order.getCustomerId(), id);
+ })
+ .through(users + "_" + orders, Produced.with(Serdes.Long(), Serdes.String()))
+ .leftJoin(
+ usersTable,
+ (order, user) ->
+ {
+ return
+ OrderAndUser
+ .newBuilder()
+ .setOrderId(order)
+ .setUser(user)
+ .build();
+ },
+ Joined.keySerde(Serdes.Long()))
+ .map((id, orderAndUser) ->
+ {
+ List<CharSequence> messages = new LinkedList<>();
+
+ Builder validation =
+ OrderValidation
+ .newBuilder()
+ .setOrderId(orderAndUser.getOrderId())
+ .setCheckType(ORDER_USER_CHECK)
+ .setMessages(messages);
+
+ if (orderAndUser.getUser() == null)
+ return
+ new KeyValue<>(
+ orderAndUser.getOrderId().toString(),
+ validation.setValidationResult(FAIL).build());
+
+ User user = orderAndUser.getUser();
+
+ if (user.getLevel() != CustomerLevel.UNWANTED)
+ return
+ new KeyValue<>(
+ orderAndUser.getOrderId().toString(),
+ validation.setValidationResult(PASS).build());
+
+ messages.add("User is UNWANTED: " + user.getName());
+ return
+ new KeyValue<>(
+ orderAndUser.getOrderId().toString(),
+ validation.setValidationResult(FAIL).build());
+ })
+ .to(validation);
+
Topology topology = builder.build();
streams = new KafkaStreams(topology, properties);
streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->