X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fmicroservices;a=blobdiff_plain;f=validate-user%2Fsrc%2Fmain%2Fjava%2Fde%2Ftrion%2Fmicroservices%2Fvalidateuser%2FValidateUserService.java;fp=validate-user%2Fsrc%2Fmain%2Fjava%2Fde%2Ftrion%2Fmicroservices%2Fvalidateuser%2FValidateUserService.java;h=479cf7e090aa8af4267070622794124d143fc532;hp=0000000000000000000000000000000000000000;hb=a4954bc66ddc26c5eae70a1f3bc482aa61114ed9;hpb=0a43a689c13eb4404ce662bc7a27160899b2bb23 diff --git a/validate-user/src/main/java/de/trion/microservices/validateuser/ValidateUserService.java b/validate-user/src/main/java/de/trion/microservices/validateuser/ValidateUserService.java new file mode 100644 index 0000000..479cf7e --- /dev/null +++ b/validate-user/src/main/java/de/trion/microservices/validateuser/ValidateUserService.java @@ -0,0 +1,77 @@ +package de.trion.microservices.validateorder; + + +import de.trion.microservices.avro.Order; +import de.trion.microservices.avro.OrderState; +import de.trion.microservices.avro.OrderValidation; +import static de.trion.microservices.avro.OrderValidationResult.FAIL; +import static de.trion.microservices.avro.OrderValidationResult.PASS; +import static de.trion.microservices.avro.OrderValidationType.ORDER_DETAILS_CHECK; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + + +@Component +public class ValidateUserService +{ + final static Logger LOG = LoggerFactory.getLogger(ValidateUserService.class); + + private final String orders; + private final String validation; + private final KafkaStreams streams; + + + public ValidateUserService(ApplicationProperties config) + { + orders = config.ordersTopic; + validation = config.validationTopic; + + Properties properties = new Properties(); + properties.put("bootstrap.servers", config.bootstrapServers); + properties.put("application.id", "validate-user"); + properties.put("schema.registry.url", config.schemaRegistryUrl); + properties.put("default.key.serde", Serdes.String().getClass()); + properties.put("default.value.serde", SpecificAvroSerde.class); + + StreamsBuilder builder = new StreamsBuilder(); + + Topology topology = builder.build(); + streams = new KafkaStreams(topology, properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + + + @PostConstruct + public void start() + { + streams.start(); + } + + @PreDestroy + public void stop() + { + streams.close(); + } +}