X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fmicroservices;a=blobdiff_plain;f=validate-order%2Fsrc%2Fmain%2Fjava%2Fde%2Ftrion%2Fmicroservices%2Fvalidateorder%2FValidateOrderService.java;fp=validate-order%2Fsrc%2Fmain%2Fjava%2Fde%2Ftrion%2Fmicroservices%2Fvalidateorder%2FValidateOrderService.java;h=1ad46ce2ded86aee9d551d9b2a210ea73fcee2f2;hp=0000000000000000000000000000000000000000;hb=4f784f887f530419d66700b3e4e379c7ff36340a;hpb=1d74e253330fe4e04db523cc504ce64cb3619304 diff --git a/validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java b/validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java new file mode 100644 index 0000000..1ad46ce --- /dev/null +++ b/validate-order/src/main/java/de/trion/microservices/validateorder/ValidateOrderService.java @@ -0,0 +1,101 @@ +package de.trion.microservices.validateorder; + + +import de.trion.microservices.avro.Order; +import de.trion.microservices.avro.OrderState; +import de.trion.microservices.avro.OrderValidation; +import static de.trion.microservices.avro.OrderValidationResult.FAIL; +import static de.trion.microservices.avro.OrderValidationResult.PASS; +import static de.trion.microservices.avro.OrderValidationType.ORDER_DETAILS_CHECK; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + + +@Component +public class ValidateOrderService +{ + final static Logger LOG = LoggerFactory.getLogger(ValidateOrderService.class); + + private final String orders; + private final String validation; + private final KafkaStreams streams; + + + public ValidateOrderService(ApplicationProperties config) + { + orders = config.ordersTopic; + validation = config.validationTopic; + + Properties properties = new Properties(); + properties.put("bootstrap.servers", config.bootstrapServers); + properties.put("application.id", "validate-order"); + properties.put("schema.registry.url", config.schemaRegistryUrl); + properties.put("default.key.serde", Serdes.String().getClass()); + properties.put("default.value.serde", SpecificAvroSerde.class); + + StreamsBuilder builder = new StreamsBuilder(); + builder + .stream(orders) + .filter((id, order) -> order.getState() == OrderState.CREATED) + .mapValues((order) -> + { + List messages = new LinkedList<>(); + + if (order.getCustomerId() < 1) + messages.add("Customer-ID must be greater than 0"); + if (order.getProductId() < 1) + messages.add("Product-ID must be greater than 0"); + if (order.getQuantity() < 1) + messages.add("The ordered quantity must be greater than 0"); + + return + OrderValidation + .newBuilder() + .setOrderId(order.getId()) + .setCheckType(ORDER_DETAILS_CHECK) + .setValidationResult(messages.isEmpty() ? PASS : FAIL) + .setMessages(messages) + .build(); + }) + .to(validation); + + Topology topology = builder.build(); + streams = new KafkaStreams(topology, properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + + + @PostConstruct + public void start() + { + streams.start(); + } + + @PreDestroy + public void stop() + { + streams.close(); + } +}