--- /dev/null
+#!/bin/bash
+
+if [ "$1" = "cleanup" ]
+then
+ docker-compose down
+ mvn clean
+ rm -rvf */src/main/java/de/trion/microservices/avro
+ exit
+fi
+
+mvn package || exit 1
+if [ ! -e take-order/target/BUILD ] || [ "$(find take-order/target/classes/ -anewer take-order/target/BUILD | grep take-order/target/classes/de )" != "" ]
+then
+ docker build -t trion/take-order-service:01 take-order
+ touch take-order/target/BUILD
+fi
+if [ "$1" = "build" ]; then exit; fi
+
+trap 'kill $(jobs -p)' EXIT
+
+docker container start toolbox
+docker-compose up -d zookeeper kafka schema-registry
+
+while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done
+
+kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders
+
+docker-compose up -d take-order
+
+kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic orders &
+while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for take-order..."; sleep 1; done
+
+http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=5
+http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=
+http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=-5
+http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=-234 quantiyt=5
+http -v post 0:8091/orders Accept:*/* customerId=2 productId=234 quantity=5
--- /dev/null
+version: '3.2'
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:6.2.0
+ ports:
+ - "2181:2181"
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+
+ kafka:
+ image: confluentinc/cp-kafka:6.2.0
+ ports:
+ - 9092:9092
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
+ KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ depends_on:
+ - zookeeper
+
+ schema-registry:
+ image: confluentinc/cp-schema-registry:6.2.0
+ hostname: schema-registry
+ ports:
+ - "8081:8081"
+ environment:
+ SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9093
+ SCHEMA_REGISTRY_HOST_NAME: schema-registry
+ SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
+ depends_on:
+ - zookeeper
+ - kafka
+
+ transfer
+ image: juplo/transfer-service:mvp
+ ports:
+ - "8091:8080"
+ depends_on:
+ - zookeeper
+ - kafka
+ - schema-registry
+
+networks:
+ default:
+ external:
+ name: juplo
--- /dev/null
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>de.trion.kafka.payment</groupId>
+ <artifactId>payment-bom</artifactId>
+ <name>Payment System Example</name>
+ <version>1.0.0</version>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>transfer</module>
+ </modules>
+
+</project>
--- /dev/null
+*
+!target/*.jar
--- /dev/null
+FROM openjdk:8-jre-slim
+COPY target/take-order-01-0-SNAPSHOT.jar /opt/
+EXPOSE 8080
+CMD ["java", "-jar", "/opt/take-order-01-0-SNAPSHOT.jar"]
--- /dev/null
+[
+ {
+ "namespace": "de.trion.microservices.avro",
+ "type": "enum",
+ "name": "OrderState",
+ "symbols" : [ "CREATED" ],
+ "default" : "CREATED"
+ },
+ {
+ "namespace": "de.trion.microservices.avro",
+ "type": "record",
+ "name": "Order",
+ "fields": [
+ { "name": "id", "type": "string" },
+ { "name": "state", "type": "OrderState" },
+ { "name": "customerId", "type": "long" },
+ { "name": "orderId", "type": "long" },
+ { "name": "productId", "type": "long" },
+ { "name": "quantity", "type": "int" }
+ ]
+ }
+]
--- /dev/null
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.5.1.RELEASE</version>
+ <relativePath></relativePath>
+ </parent>
+
+ <groupId>de.juplo.kafka.payment</groupId>
+ <artifactId>transfer</artifactId>
+ <name>Transfer Service</name>
+ <version>1.0.0</version>
+
+ <properties>
+ <avro.version>1.10.2</avro.version>
+ <confluent.version>6.2.0</confluent.version>
+ <kafka.version>2.8.0</kafka.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-streams-avro-serde</artifactId>
+ <version>${confluent.version}</version>
+ </dependency>
+ </dependencies>
+
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <url>https://packages.confluent.io/maven/</url>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>${avro.version}</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${project.basedir}/</sourceDirectory>
+ <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.juplo.kafka.payment.transfer;
+
+import javax.validation.constraints.NotNull;
+
+
+/**
+ * Simple DTO used by the REST interface
+ */
+public class OrderBean
+{
+ @NotNull(message = "Cannot be null")
+ private long id;
+ @NotNull(message = "Cannot be null")
+ private long customerId;
+ @NotNull(message = "Cannot be null")
+ private long productId;
+ @NotNull(message = "Cannot be null")
+ private int quantity;
+
+
+ public OrderBean() {}
+
+ public OrderBean(
+ final long id,
+ final long customerId,
+ final long productId,
+ final int quantity)
+ {
+ this.id = id;
+ this.customerId = customerId;
+ this.productId = productId;
+ this.quantity = quantity;
+ }
+
+ public long getId()
+ {
+ return id;
+ }
+
+ public long getCustomerId()
+ {
+ return customerId;
+ }
+
+ public long getProductId()
+ {
+ return productId;
+ }
+
+ public int getQuantity()
+ {
+ return quantity;
+ }
+
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o)
+ return true;
+ if (o == null || this.getClass() != o.getClass())
+ return false;
+
+ final OrderBean orderBean = (OrderBean) o;
+
+ return this.customerId == orderBean.customerId;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" +
+ "id=" + id +
+ ", customerId=" + customerId +
+ ", productId=" + productId +
+ ", quantity=" + quantity +
+ '}';
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Long.hashCode(this.id);
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer;
+
+
+import de.trion.microservices.avro.Order;
+import de.trion.microservices.avro.OrderState;
+import java.net.URI;
+import java.util.UUID;
+import javax.validation.Valid;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.context.request.async.DeferredResult;
+
+
+@RestController
+public class TransferService
+{
+ private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
+
+ private final KafkaProducer<UUID,Order> producer;
+ private final String topic;
+ private final String path;
+
+
+ TransferService(
+ final KafkaProducer<UUID,Order> producer,
+ final TransferServiceProperties properties)
+ {
+ this.producer = producer;
+ this.topic = properties.getTopic();
+ this.path = properties.getPath();
+ }
+
+
+ @PostMapping(
+ path = "/orders",
+ consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,
+ produces = MediaType.TEXT_PLAIN_VALUE)
+ public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody OrderBean order)
+ {
+ DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
+
+ try
+ {
+ UUID uuid = UUID.randomUUID();
+ ProducerRecord<UUID, Order> record =
+ new ProducerRecord<>(
+ topic,
+ uuid,
+ Order
+ .newBuilder()
+ .setId(uuid.toString())
+ .setState(OrderState.CREATED)
+ .setCustomerId(order.getCustomerId())
+ .setOrderId(order.getId())
+ .setProductId(order.getProductId())
+ .setQuantity(order.getQuantity())
+ .build());
+
+ producer.send(record, (metadata, exception) ->
+ {
+ if (exception != null)
+ {
+ LOG.error("Could not place order {}: {}", order, exception.toString());
+ result.setErrorResult(exception);
+ return;
+ }
+
+ result.setResult(ResponseEntity.created(URI.create(path + uuid)).build());
+ });
+ }
+ catch (Exception e)
+ {
+ LOG.error("Unexpected exception!", e);
+ result.setErrorResult(e);
+ }
+
+ return result;
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer;
+
+
+import de.trion.microservices.avro.Order;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.UUIDSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(TransferServiceProperties.class)
+public class TransferServiceApplication
+{
+ private final static Logger LOG = LoggerFactory.getLogger(TransferServiceApplication.class);
+
+
+ @Bean(destroyMethod = "close")
+ KafkaProducer<UUID,Order> producer(TransferServiceProperties properties)
+ {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", properties.bootstrapServers);
+ props.put("schema.registry.url", properties.schemaRegistryUrl);
+ props.put("key.serializer", UUIDSerializer.class.getName());
+ props.put("value.serializer", SpecificAvroSerializer.class.getName());
+
+ return new KafkaProducer<>(props);
+ }
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(TransferServiceApplication.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer;
+
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("take-order")
+public class TransferServiceProperties
+{
+ String bootstrapServers = "kafka:9092";
+ String schemaRegistryUrl = "http://schema-registry:8081";
+ String topic = "orders";
+ String path = "http://details:8092/orders/";
+
+
+ public String getBootstrapServers()
+ {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers)
+ {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public String getSchemaRegistryUrl()
+ {
+ return schemaRegistryUrl;
+ }
+
+ public void setSchemaRegistryUrl(String schemaRegistryUrl)
+ {
+ this.schemaRegistryUrl = schemaRegistryUrl;
+ }
+
+ public String getTopic()
+ {
+ return topic;
+ }
+
+ public void setTopic(String topic)
+ {
+ this.topic = topic;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ public void setPath(String path)
+ {
+ this.path = path;
+ }
+}
--- /dev/null
+management.endpoints.web.exposure.include=*
+
+logging.level.de.trion=info