streams - Übungen - Microservices - Schritt 03
[demos/microservices] / validation-results / src / main / java / de / trion / microservices / validationresults / ValidationResultsService.java
1 package de.trion.microservices.validationresults;
2
3
4 import de.trion.microservices.avro.Order;
5 import de.trion.microservices.avro.OrderState;
6 import static de.trion.microservices.avro.OrderState.APPROVED;
7 import static de.trion.microservices.avro.OrderState.DECLINED;
8 import de.trion.microservices.avro.OrderValidation;
9 import static de.trion.microservices.avro.OrderValidationResult.PASS;
10 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
11 import java.time.Duration;
12 import java.util.Properties;
13 import javax.annotation.PostConstruct;
14 import javax.annotation.PreDestroy;
15 import org.apache.kafka.common.serialization.Serdes;
16 import org.apache.kafka.streams.KafkaStreams;
17 import org.apache.kafka.streams.StreamsBuilder;
18 import org.apache.kafka.streams.Topology;
19 import org.apache.kafka.streams.kstream.JoinWindows;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22 import org.springframework.stereotype.Component;
23
24
25 @Component
26 public class ValidationResultsService
27 {
28   final static Logger LOG = LoggerFactory.getLogger(ValidationResultsService.class);
29
30   private final String orders;
31   private final String validation;
32   private final KafkaStreams streams;
33
34
35   public ValidationResultsService(ApplicationProperties config)
36   {
37     orders = config.ordersTopic;
38     validation = config.validationTopic;
39
40     Properties properties = new Properties();
41     properties.put("bootstrap.servers", config.bootstrapServers);
42     properties.put("application.id", "validation-results");
43     properties.put("schema.registry.url", config.schemaRegistryUrl);
44     properties.put("default.key.serde", Serdes.String().getClass());
45     properties.put("default.value.serde", SpecificAvroSerde.class);
46
47     StreamsBuilder builder = new StreamsBuilder();
48     builder
49         .<String,Order>stream(orders)
50         .filter((id, order) -> order.getState() == OrderState.CREATED)
51         .join(
52             builder.<String,OrderValidation>stream(validation),
53             (order, outcome) ->
54             {
55               return
56                   Order
57                       .newBuilder(order)
58                       .setState(outcome.getValidationResult() == PASS ? APPROVED : DECLINED)
59                       .build();
60             },
61             JoinWindows.of(Duration.ofHours(1)))
62         .to(orders);
63
64     Topology topology = builder.build();
65     streams = new KafkaStreams(topology, properties);
66     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
67     {
68       LOG.error("Unexpected error in thread {}: {}", t, e.toString());
69       try
70       {
71         streams.close();
72       }
73       catch (Exception ex)
74       {
75         LOG.error("Could not close KafkaStreams!", ex);
76       }
77     });
78   }
79
80
81   @PostConstruct
82   public void start()
83   {
84     streams.start();
85   }
86
87   @PreDestroy
88   public void stop()
89   {
90     streams.close();
91   }
92 }