1 package de.trion.microservices.validateuser;
4 import de.trion.microservices.avro.CustomerLevel;
5 import de.trion.microservices.avro.Order;
6 import de.trion.microservices.avro.OrderAndUser;
7 import de.trion.microservices.avro.OrderState;
8 import de.trion.microservices.avro.OrderValidation;
9 import de.trion.microservices.avro.OrderValidation.Builder;
10 import static de.trion.microservices.avro.OrderValidationResult.FAIL;
11 import static de.trion.microservices.avro.OrderValidationResult.PASS;
12 import static de.trion.microservices.avro.OrderValidationType.ORDER_USER_CHECK;
13 import de.trion.microservices.avro.User;
14 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.Properties;
18 import javax.annotation.PostConstruct;
19 import javax.annotation.PreDestroy;
20 import org.apache.kafka.common.serialization.Serdes;
21 import org.apache.kafka.streams.KafkaStreams;
22 import org.apache.kafka.streams.KeyValue;
23 import org.apache.kafka.streams.StreamsBuilder;
24 import org.apache.kafka.streams.Topology;
25 import org.apache.kafka.streams.Topology.AutoOffsetReset;
26 import org.apache.kafka.streams.kstream.Consumed;
27 import org.apache.kafka.streams.kstream.Joined;
28 import org.apache.kafka.streams.kstream.KTable;
29 import org.apache.kafka.streams.kstream.Materialized;
30 import org.apache.kafka.streams.kstream.Produced;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.springframework.stereotype.Component;
37 public class ValidateUserService
39 final static Logger LOG = LoggerFactory.getLogger(ValidateUserService.class);
41 private final String orders;
42 private final String users;
43 private final String validation;
44 private final KafkaStreams streams;
47 public ValidateUserService(ApplicationProperties config)
49 orders = config.ordersTopic;
50 users = config.usersTopic;
51 validation = config.validationTopic;
53 Properties properties = new Properties();
54 properties.put("bootstrap.servers", config.bootstrapServers);
55 properties.put("application.id", "validate-user");
56 properties.put("schema.registry.url", config.schemaRegistryUrl);
57 properties.put("default.key.serde", Serdes.String().getClass());
58 properties.put("default.value.serde", SpecificAvroSerde.class);
60 StreamsBuilder builder = new StreamsBuilder();
62 KTable<Long, User> usersTable =
66 Consumed.with(Serdes.Long(), null, null, AutoOffsetReset.EARLIEST),
67 Materialized.as(users));
70 .<String,Order>stream(orders)
71 .filter((id, order) -> order.getState() == OrderState.CREATED)
74 return new KeyValue<>(order.getCustomerId(), id);
76 .through(users + "_" + orders, Produced.with(Serdes.Long(), Serdes.String()))
88 Joined.keySerde(Serdes.Long()))
89 .map((id, orderAndUser) ->
91 List<CharSequence> messages = new LinkedList<>();
96 .setOrderId(orderAndUser.getOrderId())
97 .setCheckType(ORDER_USER_CHECK)
98 .setMessages(messages);
100 if (orderAndUser.getUser() == null)
103 orderAndUser.getOrderId().toString(),
104 validation.setValidationResult(FAIL).build());
106 User user = orderAndUser.getUser();
108 if (user.getLevel() != CustomerLevel.UNWANTED)
111 orderAndUser.getOrderId().toString(),
112 validation.setValidationResult(PASS).build());
114 messages.add("User is UNWANTED: " + user.getName());
117 orderAndUser.getOrderId().toString(),
118 validation.setValidationResult(FAIL).build());
122 Topology topology = builder.build();
123 streams = new KafkaStreams(topology, properties);
124 streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
126 LOG.error("Unexpected error in thread {}: {}", t, e.toString());
133 LOG.error("Could not close KafkaStreams!", ex);