+package de.trion.microservices.validationresults;
+
+
+import de.trion.microservices.avro.Order;
+import de.trion.microservices.avro.OrderState;
+import static de.trion.microservices.avro.OrderState.APPROVED;
+import static de.trion.microservices.avro.OrderState.DECLINED;
+import de.trion.microservices.avro.OrderValidation;
+import static de.trion.microservices.avro.OrderValidationResult.PASS;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
+import java.time.Duration;
+import java.util.Properties;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class ValidationResultsService
+{
+ final static Logger LOG = LoggerFactory.getLogger(ValidationResultsService.class);
+
+ private final String orders;
+ private final String validation;
+ private final KafkaStreams streams;
+
+
+ public ValidationResultsService(ApplicationProperties config)
+ {
+ orders = config.ordersTopic;
+ validation = config.validationTopic;
+
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", config.bootstrapServers);
+ properties.put("application.id", "validation-results");
+ properties.put("schema.registry.url", config.schemaRegistryUrl);
+ properties.put("default.key.serde", Serdes.String().getClass());
+ properties.put("default.value.serde", SpecificAvroSerde.class);
+
+ StreamsBuilder builder = new StreamsBuilder();
+ builder
+ .<String,Order>stream(orders)
+ .filter((id, order) -> order.getState() == OrderState.CREATED)
+ .join(
+ builder.<String,OrderValidation>stream(validation),
+ (order, outcome) ->
+ {
+ return
+ Order
+ .newBuilder(order)
+ .setState(outcome.getValidationResult() == PASS ? APPROVED : DECLINED)
+ .build();
+ },
+ JoinWindows.of(Duration.ofHours(1)))
+ .to(orders);
+
+ Topology topology = builder.build();
+ streams = new KafkaStreams(topology, properties);
+ streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+ {
+ LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+ try
+ {
+ streams.close();
+ }
+ catch (Exception ex)
+ {
+ LOG.error("Could not close KafkaStreams!", ex);
+ }
+ });
+ }
+
+
+ @PostConstruct
+ public void start()
+ {
+ streams.start();
+ }
+
+ @PreDestroy
+ public void stop()
+ {
+ streams.close();
+ }
+}