X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fmicroservices;a=blobdiff_plain;f=validation-results%2Fsrc%2Fmain%2Fjava%2Fde%2Ftrion%2Fmicroservices%2Fvalidationresults%2FValidationResultsService.java;fp=validation-results%2Fsrc%2Fmain%2Fjava%2Fde%2Ftrion%2Fmicroservices%2Fvalidationresults%2FValidationResultsService.java;h=3b3d94fb8ab68a3d98898e995900090dacfa4371;hp=0000000000000000000000000000000000000000;hb=9c32239c9f2889c34499623329c9bf801d0a4288;hpb=4f784f887f530419d66700b3e4e379c7ff36340a diff --git a/validation-results/src/main/java/de/trion/microservices/validationresults/ValidationResultsService.java b/validation-results/src/main/java/de/trion/microservices/validationresults/ValidationResultsService.java new file mode 100644 index 0000000..3b3d94f --- /dev/null +++ b/validation-results/src/main/java/de/trion/microservices/validationresults/ValidationResultsService.java @@ -0,0 +1,92 @@ +package de.trion.microservices.validationresults; + + +import de.trion.microservices.avro.Order; +import de.trion.microservices.avro.OrderState; +import static de.trion.microservices.avro.OrderState.APPROVED; +import static de.trion.microservices.avro.OrderState.DECLINED; +import de.trion.microservices.avro.OrderValidation; +import static de.trion.microservices.avro.OrderValidationResult.PASS; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.time.Duration; +import java.util.Properties; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + + +@Component +public class ValidationResultsService +{ + final static Logger LOG = LoggerFactory.getLogger(ValidationResultsService.class); + + private final String orders; + private final String validation; + private final KafkaStreams streams; + + + public ValidationResultsService(ApplicationProperties config) + { + orders = config.ordersTopic; + validation = config.validationTopic; + + Properties properties = new Properties(); + properties.put("bootstrap.servers", config.bootstrapServers); + properties.put("application.id", "validation-results"); + properties.put("schema.registry.url", config.schemaRegistryUrl); + properties.put("default.key.serde", Serdes.String().getClass()); + properties.put("default.value.serde", SpecificAvroSerde.class); + + StreamsBuilder builder = new StreamsBuilder(); + builder + .stream(orders) + .filter((id, order) -> order.getState() == OrderState.CREATED) + .join( + builder.stream(validation), + (order, outcome) -> + { + return + Order + .newBuilder(order) + .setState(outcome.getValidationResult() == PASS ? APPROVED : DECLINED) + .build(); + }, + JoinWindows.of(Duration.ofHours(1))) + .to(orders); + + Topology topology = builder.build(); + streams = new KafkaStreams(topology, properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + + + @PostConstruct + public void start() + { + streams.start(); + } + + @PreDestroy + public void stop() + { + streams.close(); + } +}