From: Kai Moritz Date: Fri, 11 Jun 2021 11:17:02 +0000 (+0200) Subject: Init X-Git-Tag: wip-initialer-commit~26 X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=commitdiff_plain;h=178ce6ffc044415eb00f250aed2f9b1999bd47d1 Init --- 178ce6ffc044415eb00f250aed2f9b1999bd47d1 diff --git a/README.sh b/README.sh new file mode 100755 index 0000000..1d5eb98 --- /dev/null +++ b/README.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +if [ "$1" = "cleanup" ] +then + docker-compose down + mvn clean + rm -rvf */src/main/java/de/trion/microservices/avro + exit +fi + +mvn package || exit 1 +if [ ! -e take-order/target/BUILD ] || [ "$(find take-order/target/classes/ -anewer take-order/target/BUILD | grep take-order/target/classes/de )" != "" ] +then + docker build -t trion/take-order-service:01 take-order + touch take-order/target/BUILD +fi +if [ "$1" = "build" ]; then exit; fi + +trap 'kill $(jobs -p)' EXIT + +docker container start toolbox +docker-compose up -d zookeeper kafka schema-registry + +while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done + +kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders + +docker-compose up -d take-order + +kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic orders & +while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for take-order..."; sleep 1; done + +http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=5 +http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity= +http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=-5 +http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=-234 quantiyt=5 +http -v post 0:8091/orders Accept:*/* customerId=2 productId=234 quantity=5 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..e6655e7 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,48 @@ +version: '3.2' +services: + zookeeper: + image: confluentinc/cp-zookeeper:6.2.0 + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: confluentinc/cp-kafka:6.2.0 + ports: + - 9092:9092 + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + depends_on: + - zookeeper + + schema-registry: + image: confluentinc/cp-schema-registry:6.2.0 + hostname: schema-registry + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9093 + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + depends_on: + - zookeeper + - kafka + + transfer + image: juplo/transfer-service:mvp + ports: + - "8091:8080" + depends_on: + - zookeeper + - kafka + - schema-registry + +networks: + default: + external: + name: juplo diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..43a1e8e --- /dev/null +++ b/pom.xml @@ -0,0 +1,15 @@ + + + 4.0.0 + + de.trion.kafka.payment + payment-bom + Payment System Example + 1.0.0 + pom + + + transfer + + + diff --git a/transfer/.dockerignore b/transfer/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/transfer/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/transfer/Dockerfile b/transfer/Dockerfile new file mode 100644 index 0000000..cd6a95b --- /dev/null +++ b/transfer/Dockerfile @@ -0,0 +1,4 @@ +FROM openjdk:8-jre-slim +COPY target/take-order-01-0-SNAPSHOT.jar /opt/ +EXPOSE 8080 +CMD ["java", "-jar", "/opt/take-order-01-0-SNAPSHOT.jar"] diff --git a/transfer/order.avsc b/transfer/order.avsc new file mode 100644 index 0000000..f4da708 --- /dev/null +++ b/transfer/order.avsc @@ -0,0 +1,22 @@ +[ + { + "namespace": "de.trion.microservices.avro", + "type": "enum", + "name": "OrderState", + "symbols" : [ "CREATED" ], + "default" : "CREATED" + }, + { + "namespace": "de.trion.microservices.avro", + "type": "record", + "name": "Order", + "fields": [ + { "name": "id", "type": "string" }, + { "name": "state", "type": "OrderState" }, + { "name": "customerId", "type": "long" }, + { "name": "orderId", "type": "long" }, + { "name": "productId", "type": "long" }, + { "name": "quantity", "type": "int" } + ] + } +] diff --git a/transfer/pom.xml b/transfer/pom.xml new file mode 100644 index 0000000..58fd758 --- /dev/null +++ b/transfer/pom.xml @@ -0,0 +1,81 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.5.1.RELEASE + + + + de.juplo.kafka.payment + transfer + Transfer Service + 1.0.0 + + + 1.10.2 + 6.2.0 + 2.8.0 + + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-web + + + org.apache.kafka + kafka-client + + + org.apache.avro + avro + ${avro.version} + + + io.confluent + kafka-streams-avro-serde + ${confluent.version} + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/ + ${project.basedir}/src/main/java/ + + + + + + + + diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/OrderBean.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/OrderBean.java new file mode 100644 index 0000000..6c724cc --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/OrderBean.java @@ -0,0 +1,84 @@ +package de.juplo.kafka.payment.transfer; + +import javax.validation.constraints.NotNull; + + +/** + * Simple DTO used by the REST interface + */ +public class OrderBean +{ + @NotNull(message = "Cannot be null") + private long id; + @NotNull(message = "Cannot be null") + private long customerId; + @NotNull(message = "Cannot be null") + private long productId; + @NotNull(message = "Cannot be null") + private int quantity; + + + public OrderBean() {} + + public OrderBean( + final long id, + final long customerId, + final long productId, + final int quantity) + { + this.id = id; + this.customerId = customerId; + this.productId = productId; + this.quantity = quantity; + } + + public long getId() + { + return id; + } + + public long getCustomerId() + { + return customerId; + } + + public long getProductId() + { + return productId; + } + + public int getQuantity() + { + return quantity; + } + + + @Override + public boolean equals(final Object o) { + if (this == o) + return true; + if (o == null || this.getClass() != o.getClass()) + return false; + + final OrderBean orderBean = (OrderBean) o; + + return this.customerId == orderBean.customerId; + } + + @Override + public String toString() + { + return "{" + + "id=" + id + + ", customerId=" + customerId + + ", productId=" + productId + + ", quantity=" + quantity + + '}'; + } + + @Override + public int hashCode() + { + return Long.hashCode(this.id); + } +} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java new file mode 100644 index 0000000..9da09a9 --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java @@ -0,0 +1,86 @@ +package de.juplo.kafka.payment.transfer; + + +import de.trion.microservices.avro.Order; +import de.trion.microservices.avro.OrderState; +import java.net.URI; +import java.util.UUID; +import javax.validation.Valid; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; + + +@RestController +public class TransferService +{ + private final static Logger LOG = LoggerFactory.getLogger(TransferService.class); + + private final KafkaProducer producer; + private final String topic; + private final String path; + + + TransferService( + final KafkaProducer producer, + final TransferServiceProperties properties) + { + this.producer = producer; + this.topic = properties.getTopic(); + this.path = properties.getPath(); + } + + + @PostMapping( + path = "/orders", + consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, + produces = MediaType.TEXT_PLAIN_VALUE) + public DeferredResult> placeOrder(@Valid @RequestBody OrderBean order) + { + DeferredResult> result = new DeferredResult<>(); + + try + { + UUID uuid = UUID.randomUUID(); + ProducerRecord record = + new ProducerRecord<>( + topic, + uuid, + Order + .newBuilder() + .setId(uuid.toString()) + .setState(OrderState.CREATED) + .setCustomerId(order.getCustomerId()) + .setOrderId(order.getId()) + .setProductId(order.getProductId()) + .setQuantity(order.getQuantity()) + .build()); + + producer.send(record, (metadata, exception) -> + { + if (exception != null) + { + LOG.error("Could not place order {}: {}", order, exception.toString()); + result.setErrorResult(exception); + return; + } + + result.setResult(ResponseEntity.created(URI.create(path + uuid)).build()); + }); + } + catch (Exception e) + { + LOG.error("Unexpected exception!", e); + result.setErrorResult(e); + } + + return result; + } +} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java new file mode 100644 index 0000000..4a55c85 --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -0,0 +1,41 @@ +package de.juplo.kafka.payment.transfer; + + +import de.trion.microservices.avro.Order; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; +import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.UUIDSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + + +@SpringBootApplication +@EnableConfigurationProperties(TransferServiceProperties.class) +public class TransferServiceApplication +{ + private final static Logger LOG = LoggerFactory.getLogger(TransferServiceApplication.class); + + + @Bean(destroyMethod = "close") + KafkaProducer producer(TransferServiceProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.bootstrapServers); + props.put("schema.registry.url", properties.schemaRegistryUrl); + props.put("key.serializer", UUIDSerializer.class.getName()); + props.put("value.serializer", SpecificAvroSerializer.class.getName()); + + return new KafkaProducer<>(props); + } + + public static void main(String[] args) + { + SpringApplication.run(TransferServiceApplication.class, args); + } +} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java new file mode 100644 index 0000000..8a50001 --- /dev/null +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java @@ -0,0 +1,55 @@ +package de.juplo.kafka.payment.transfer; + + +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("take-order") +public class TransferServiceProperties +{ + String bootstrapServers = "kafka:9092"; + String schemaRegistryUrl = "http://schema-registry:8081"; + String topic = "orders"; + String path = "http://details:8092/orders/"; + + + public String getBootstrapServers() + { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) + { + this.bootstrapServers = bootstrapServers; + } + + public String getSchemaRegistryUrl() + { + return schemaRegistryUrl; + } + + public void setSchemaRegistryUrl(String schemaRegistryUrl) + { + this.schemaRegistryUrl = schemaRegistryUrl; + } + + public String getTopic() + { + return topic; + } + + public void setTopic(String topic) + { + this.topic = topic; + } + + public String getPath() + { + return path; + } + + public void setPath(String path) + { + this.path = path; + } +} diff --git a/transfer/src/main/resources/application.properties b/transfer/src/main/resources/application.properties new file mode 100644 index 0000000..f22f985 --- /dev/null +++ b/transfer/src/main/resources/application.properties @@ -0,0 +1,3 @@ +management.endpoints.web.exposure.include=* + +logging.level.de.trion=info