streams - Übungen - Microservices - Schritt 03
[demos/microservices] / validate-order / src / main / java / de / trion / microservices / validateorder / ValidateOrderService.java
1 package de.trion.microservices.validateorder;
2
3
4 import de.trion.microservices.avro.Order;
5 import de.trion.microservices.avro.OrderState;
6 import de.trion.microservices.avro.OrderValidation;
7 import static de.trion.microservices.avro.OrderValidationResult.FAIL;
8 import static de.trion.microservices.avro.OrderValidationResult.PASS;
9 import static de.trion.microservices.avro.OrderValidationType.ORDER_DETAILS_CHECK;
10 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
11 import java.util.LinkedList;
12 import java.util.List;
13 import java.util.Properties;
14 import javax.annotation.PostConstruct;
15 import javax.annotation.PreDestroy;
16 import org.apache.kafka.common.serialization.Serdes;
17 import org.apache.kafka.streams.KafkaStreams;
18 import org.apache.kafka.streams.StreamsBuilder;
19 import org.apache.kafka.streams.Topology;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22 import org.springframework.stereotype.Component;
23
24
25 @Component
26 public class ValidateOrderService
27 {
28   final static Logger LOG = LoggerFactory.getLogger(ValidateOrderService.class);
29
30   private final String orders;
31   private final String validation;
32   private final KafkaStreams streams;
33
34
35   public ValidateOrderService(ApplicationProperties config)
36   {
37     orders = config.ordersTopic;
38     validation = config.validationTopic;
39
40     Properties properties = new Properties();
41     properties.put("bootstrap.servers", config.bootstrapServers);
42     properties.put("application.id", "validate-order");
43     properties.put("schema.registry.url", config.schemaRegistryUrl);
44     properties.put("default.key.serde", Serdes.String().getClass());
45     properties.put("default.value.serde", SpecificAvroSerde.class);
46
47     StreamsBuilder builder = new StreamsBuilder();
48     builder
49         .<String,Order>stream(orders)
50         .filter((id, order) -> order.getState() == OrderState.CREATED)
51         .mapValues((order) ->
52         {
53           List<CharSequence> messages = new LinkedList<>();
54
55           if (order.getCustomerId() < 1)
56             messages.add("Customer-ID must be greater than 0");
57           if (order.getProductId() < 1)
58             messages.add("Product-ID must be greater than 0");
59           if (order.getQuantity() < 1)
60             messages.add("The ordered quantity must be greater than 0");
61
62           return
63               OrderValidation
64                   .newBuilder()
65                   .setOrderId(order.getId())
66                   .setCheckType(ORDER_DETAILS_CHECK)
67                   .setValidationResult(messages.isEmpty() ? PASS : FAIL)
68                   .setMessages(messages)
69                   .build();
70         })
71         .to(validation);
72
73     Topology topology = builder.build();
74     streams = new KafkaStreams(topology, properties);
75     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
76     {
77       LOG.error("Unexpected error in thread {}: {}", t, e.toString());
78       try
79       {
80         streams.close();
81       }
82       catch (Exception ex)
83       {
84         LOG.error("Could not close KafkaStreams!", ex);
85       }
86     });
87   }
88
89
90   @PostConstruct
91   public void start()
92   {
93     streams.start();
94   }
95
96   @PreDestroy
97   public void stop()
98   {
99     streams.close();
100   }
101 }