streams - Übungen - Microservices - Schritt 03
[demos/microservices] / validation-results / src / main / java / de / trion / microservices / validationresults / ValidationResultsService.java
diff --git a/validation-results/src/main/java/de/trion/microservices/validationresults/ValidationResultsService.java b/validation-results/src/main/java/de/trion/microservices/validationresults/ValidationResultsService.java
new file mode 100644 (file)
index 0000000..3b3d94f
--- /dev/null
@@ -0,0 +1,92 @@
+package de.trion.microservices.validationresults;
+
+
+import de.trion.microservices.avro.Order;
+import de.trion.microservices.avro.OrderState;
+import static de.trion.microservices.avro.OrderState.APPROVED;
+import static de.trion.microservices.avro.OrderState.DECLINED;
+import de.trion.microservices.avro.OrderValidation;
+import static de.trion.microservices.avro.OrderValidationResult.PASS;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
+import java.time.Duration;
+import java.util.Properties;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class ValidationResultsService
+{
+  final static Logger LOG = LoggerFactory.getLogger(ValidationResultsService.class);
+
+  private final String orders;
+  private final String validation;
+  private final KafkaStreams streams;
+
+
+  public ValidationResultsService(ApplicationProperties config)
+  {
+    orders = config.ordersTopic;
+    validation = config.validationTopic;
+
+    Properties properties = new Properties();
+    properties.put("bootstrap.servers", config.bootstrapServers);
+    properties.put("application.id", "validation-results");
+    properties.put("schema.registry.url", config.schemaRegistryUrl);
+    properties.put("default.key.serde", Serdes.String().getClass());
+    properties.put("default.value.serde", SpecificAvroSerde.class);
+
+    StreamsBuilder builder = new StreamsBuilder();
+    builder
+        .<String,Order>stream(orders)
+        .filter((id, order) -> order.getState() == OrderState.CREATED)
+        .join(
+            builder.<String,OrderValidation>stream(validation),
+            (order, outcome) ->
+            {
+              return
+                  Order
+                      .newBuilder(order)
+                      .setState(outcome.getValidationResult() == PASS ? APPROVED : DECLINED)
+                      .build();
+            },
+            JoinWindows.of(Duration.ofHours(1)))
+        .to(orders);
+
+    Topology topology = builder.build();
+    streams = new KafkaStreams(topology, properties);
+    streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+    {
+      LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+      try
+      {
+        streams.close();
+      }
+      catch (Exception ex)
+      {
+        LOG.error("Could not close KafkaStreams!", ex);
+      }
+    });
+  }
+
+
+  @PostConstruct
+  public void start()
+  {
+    streams.start();
+  }
+
+  @PreDestroy
+  public void stop()
+  {
+    streams.close();
+  }
+}