TMP
[demos/microservices] / validate-user / src / main / java / de / trion / microservices / validateuser / ValidateUserService.java
1 package de.trion.microservices.validateuser;
2
3
4 import de.trion.microservices.avro.CustomerLevel;
5 import de.trion.microservices.avro.Order;
6 import de.trion.microservices.avro.OrderAndUser;
7 import de.trion.microservices.avro.OrderState;
8 import de.trion.microservices.avro.OrderValidation;
9 import de.trion.microservices.avro.OrderValidation.Builder;
10 import static de.trion.microservices.avro.OrderValidationResult.FAIL;
11 import static de.trion.microservices.avro.OrderValidationResult.PASS;
12 import static de.trion.microservices.avro.OrderValidationType.ORDER_USER_CHECK;
13 import de.trion.microservices.avro.User;
14 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.Properties;
18 import javax.annotation.PostConstruct;
19 import javax.annotation.PreDestroy;
20 import org.apache.kafka.common.serialization.Serdes;
21 import org.apache.kafka.streams.KafkaStreams;
22 import org.apache.kafka.streams.KeyValue;
23 import org.apache.kafka.streams.StreamsBuilder;
24 import org.apache.kafka.streams.Topology;
25 import org.apache.kafka.streams.Topology.AutoOffsetReset;
26 import org.apache.kafka.streams.kstream.Consumed;
27 import org.apache.kafka.streams.kstream.Joined;
28 import org.apache.kafka.streams.kstream.KTable;
29 import org.apache.kafka.streams.kstream.Materialized;
30 import org.apache.kafka.streams.kstream.Produced;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.springframework.stereotype.Component;
34
35
36 @Component
37 public class ValidateUserService
38 {
39   final static Logger LOG = LoggerFactory.getLogger(ValidateUserService.class);
40
41   private final String orders;
42   private final String users;
43   private final String validation;
44   private final KafkaStreams streams;
45
46
47   public ValidateUserService(ApplicationProperties config)
48   {
49     orders = config.ordersTopic;
50     users = config.usersTopic;
51     validation = config.validationTopic;
52
53     Properties properties = new Properties();
54     properties.put("bootstrap.servers", config.bootstrapServers);
55     properties.put("application.id", "validate-user");
56     properties.put("schema.registry.url", config.schemaRegistryUrl);
57     properties.put("default.key.serde", Serdes.String().getClass());
58     properties.put("default.value.serde", SpecificAvroSerde.class);
59
60     StreamsBuilder builder = new StreamsBuilder();
61
62     KTable<Long, User> usersTable =
63         builder
64             .table(
65                 users,
66                 Consumed.with(Serdes.Long(), null, null, AutoOffsetReset.EARLIEST),
67                 Materialized.as(users));
68
69     builder
70         .<String,Order>stream(orders)
71         .filter((id, order) -> order.getState() == OrderState.CREATED)
72         .map((id,order) ->
73         {
74           return new KeyValue<>(order.getCustomerId(), id);
75         })
76         .through(users + "_" + orders, Produced.with(Serdes.Long(), Serdes.String()))
77         .leftJoin(
78             usersTable,
79             (order, user) ->
80             {
81               return
82                   OrderAndUser
83                       .newBuilder()
84                       .setOrderId(order)
85                       .setUser(user)
86                       .build();
87             },
88             Joined.keySerde(Serdes.Long()))
89         .map((id, orderAndUser) ->
90         {
91           List<CharSequence> messages = new LinkedList<>();
92
93           Builder validation =
94               OrderValidation
95                   .newBuilder()
96                   .setOrderId(orderAndUser.getOrderId())
97                   .setCheckType(ORDER_USER_CHECK)
98                   .setMessages(messages);
99
100           if (orderAndUser.getUser() == null)
101             return
102                 new KeyValue<>(
103                     orderAndUser.getOrderId().toString(),
104                     validation.setValidationResult(FAIL).build());
105
106           User user = orderAndUser.getUser();
107
108           if (user.getLevel() != CustomerLevel.UNWANTED)
109             return
110                 new KeyValue<>(
111                     orderAndUser.getOrderId().toString(),
112                     validation.setValidationResult(PASS).build());
113
114           messages.add("User is UNWANTED: " + user.getName());
115           return
116               new KeyValue<>(
117                   orderAndUser.getOrderId().toString(),
118                   validation.setValidationResult(FAIL).build());
119         })
120         .to(validation);
121
122     Topology topology = builder.build();
123     streams = new KafkaStreams(topology, properties);
124     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
125     {
126       LOG.error("Unexpected error in thread {}: {}", t, e.toString());
127       try
128       {
129         streams.close();
130       }
131       catch (Exception ex)
132       {
133         LOG.error("Could not close KafkaStreams!", ex);
134       }
135     });
136   }
137
138
139   @PostConstruct
140   public void start()
141   {
142     streams.start();
143   }
144
145   @PreDestroy
146   public void stop()
147   {
148     streams.close();
149   }
150 }