From: Kai Moritz Date: Fri, 11 Jun 2021 14:53:13 +0000 (+0200) Subject: WIP X-Git-Tag: wip-initialer-commit~18 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=98446d6726b6d6fee88211ffb345b424fdc00401;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer WIP --- diff --git a/transfer/order.avsc b/transfer/order.avsc deleted file mode 100644 index 81775da..0000000 --- a/transfer/order.avsc +++ /dev/null @@ -1,22 +0,0 @@ -[ - { - "namespace": "de.juplo.kafka.payment.avro", - "type": "enum", - "name": "OrderState", - "symbols" : [ "CREATED" ], - "default" : "CREATED" - }, - { - "namespace": "de.juplo.kafka.payment.avro", - "type": "record", - "name": "Order", - "fields": [ - { "name": "id", "type": "string" }, - { "name": "state", "type": "OrderState" }, - { "name": "customerId", "type": "long" }, - { "name": "orderId", "type": "long" }, - { "name": "productId", "type": "long" }, - { "name": "quantity", "type": "int" } - ] - } -] diff --git a/transfer/pom.xml b/transfer/pom.xml index f1305cf..d18f4f3 100644 --- a/transfer/pom.xml +++ b/transfer/pom.xml @@ -97,7 +97,7 @@ schema - ${project.basedir}/ + ${project.basedir}/src/main/resources ${project.basedir}/target/generated-sources/ diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java index 15550af..1b81028 100644 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java @@ -25,13 +25,13 @@ public class TransferService { private final static Logger LOG = LoggerFactory.getLogger(TransferService.class); - private final KafkaProducer producer; + private final KafkaProducer producer; private final String topic; private final String path; TransferService( - final KafkaProducer producer, + final KafkaProducer producer, final TransferServiceProperties properties) { this.producer = producer; @@ -44,32 +44,32 @@ public class TransferService path = "/orders", consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, produces = MediaType.TEXT_PLAIN_VALUE) - public DeferredResult> placeOrder(@Valid @RequestBody Transfer order) + public DeferredResult> placeOrder(@Valid @RequestBody TransferBean transfer) { DeferredResult> result = new DeferredResult<>(); try { UUID uuid = UUID.randomUUID(); - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( topic, uuid, - Order - .newBuilder() - .setId(uuid.toString()) - .setState(OrderState.CREATED) - .setCustomerId(order.getCustomerId()) - .setOrderId(order.getId()) - .setProductId(order.getProductId()) - .setQuantity(order.getQuantity()) + Transfer + .new + .id(transfer.getId().toString()) + .setState(TransferState.CREATED) + .setCustomerId(transfer.getCustomerId()) + .setOrderId(transfer.getId()) + .setProductId(transfer.getProductId()) + .setQuantity(transfer.getQuantity()) .build()); producer.send(record, (metadata, exception) -> { if (exception != null) { - LOG.error("Could not place order {}: {}", order, exception.toString()); + LOG.error("Could not place order {}: {}", transfer, exception.toString()); result.setErrorResult(exception); return; } diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index b42ca47..26b14d6 100644 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -4,10 +4,10 @@ package de.juplo.kafka.payment.transfer; import java.util.Properties; import java.util.UUID; -import de.juplo.kafka.payment.avro.Order; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.UUIDSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -21,10 +21,10 @@ import org.springframework.context.annotation.Bean; public class TransferServiceApplication { @Bean(destroyMethod = "close") - KafkaProducer producer(TransferServiceProperties properties) + KafkaProducer producer(TransferServiceProperties properties) { Properties props = new Properties(); - props.put("bootstrap.servers", properties.bootstrapServers); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); props.put("schema.registry.url", properties.schemaRegistryUrl); props.put("key.serializer", UUIDSerializer.class.getName()); props.put("value.serializer", SpecificAvroSerializer.class.getName()); diff --git a/transfer/src/main/resources/transfer.avr b/transfer/src/main/resources/transfer.avr new file mode 100644 index 0000000..1aab7a2 --- /dev/null +++ b/transfer/src/main/resources/transfer.avr @@ -0,0 +1,21 @@ +[ + { + "namespace": "de.juplo.kafka.payment.avro", + "type": "enum", + "name": "TransferState", + "symbols" : [ "PENDING" ], + "default" : "PENDING" + }, + { + "namespace": "de.juplo.kafka.payment.avro", + "type": "record", + "name": "Transfer", + "fields": [ + { "name": "id", "type": "string" }, + { "name": "state", "type": "TransferState" }, + { "name": "payer", "type": "long" }, + { "name": "payee", "type": "long" }, + { "name": "amount", "type": "int" } + ] + } +]