From: Kai Moritz Date: Fri, 11 Jun 2021 15:04:47 +0000 (+0200) Subject: WIP X-Git-Tag: wip-initialer-commit~16 X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=commitdiff_plain;h=192d9b20969fbbbe748430f2cb6e780916321287 WIP --- diff --git a/docker-compose.yml b/docker-compose.yml index e6655e7..ef24c74 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,20 +20,7 @@ services: depends_on: - zookeeper - schema-registry: - image: confluentinc/cp-schema-registry:6.2.0 - hostname: schema-registry - ports: - - "8081:8081" - environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9093 - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 - depends_on: - - zookeeper - - kafka - - transfer + transfer: image: juplo/transfer-service:mvp ports: - "8091:8080" diff --git a/transfer/pom.xml b/transfer/pom.xml index d18f4f3..daea31b 100644 --- a/transfer/pom.xml +++ b/transfer/pom.xml @@ -15,7 +15,6 @@ An MVP for the Transfer Service 11 - 1.10.2 6.2.0 2.8.0 @@ -39,16 +38,6 @@ runtime true - - org.apache.avro - avro - ${avro.version} - - - io.confluent - kafka-streams-avro-serde - ${confluent.version} - jakarta.validation jakarta.validation-api @@ -65,13 +54,6 @@ - - - confluent - https://packages.confluent.io/maven/ - - - @@ -86,23 +68,6 @@ - - org.apache.avro - avro-maven-plugin - ${avro.version} - - - generate-sources - - schema - - - ${project.basedir}/src/main/resources - ${project.basedir}/target/generated-sources/ - - - - diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferBean.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferBean.java deleted file mode 100644 index 498fb69..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferBean.java +++ /dev/null @@ -1,25 +0,0 @@ -package de.juplo.kafka.payment.transfer; - -import lombok.Builder; -import lombok.Data; - -import javax.validation.constraints.NotNull; -import java.util.UUID; - - -/** - * Simple DTO used by the REST interface - */ -@Data -@Builder -public class TransferBean -{ - @NotNull(message = "Cannot be null") - private UUID id; - @NotNull(message = "Cannot be null") - private long payer; - @NotNull(message = "Cannot be null") - private long payee; - @NotNull(message = "Cannot be null") - private int amount; -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java index 1b81028..9e72af1 100644 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferService.java @@ -1,11 +1,7 @@ package de.juplo.kafka.payment.transfer; -import java.net.URI; -import java.util.UUID; - -import de.juplo.kafka.payment.avro.Order; -import de.juplo.kafka.payment.avro.OrderState; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; @@ -18,6 +14,8 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import javax.validation.Valid; +import java.net.URI; +import java.util.UUID; @RestController @@ -25,45 +23,39 @@ public class TransferService { private final static Logger LOG = LoggerFactory.getLogger(TransferService.class); - private final KafkaProducer producer; + private final KafkaProducer producer; + private final ObjectMapper mapper; private final String topic; private final String path; TransferService( - final KafkaProducer producer, + final KafkaProducer producer, + final ObjectMapper mapper, final TransferServiceProperties properties) { this.producer = producer; + this.mapper = mapper; this.topic = properties.getTopic(); this.path = properties.getPath(); } @PostMapping( - path = "/orders", - consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, + path = "/transfer", + consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.TEXT_PLAIN_VALUE) - public DeferredResult> placeOrder(@Valid @RequestBody TransferBean transfer) + public DeferredResult> transfer(@Valid @RequestBody Transfer transfer) { DeferredResult> result = new DeferredResult<>(); try { - UUID uuid = UUID.randomUUID(); - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( - topic, - uuid, - Transfer - .new - .id(transfer.getId().toString()) - .setState(TransferState.CREATED) - .setCustomerId(transfer.getCustomerId()) - .setOrderId(transfer.getId()) - .setProductId(transfer.getProductId()) - .setQuantity(transfer.getQuantity()) - .build()); + topic, + transfer.getId(), + mapper.writeValueAsString(transfer)); producer.send(record, (metadata, exception) -> { @@ -74,7 +66,7 @@ public class TransferService return; } - result.setResult(ResponseEntity.created(URI.create(path + uuid)).build()); + result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build()); }); } catch (Exception e) diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 26b14d6..484d9c2 100644 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -1,19 +1,19 @@ package de.juplo.kafka.payment.transfer; -import java.util.Properties; -import java.util.UUID; - -import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.UUIDSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import java.util.Properties; +import java.util.UUID; + @SpringBootApplication @EnableConfigurationProperties(TransferServiceProperties.class) @@ -21,13 +21,12 @@ import org.springframework.context.annotation.Bean; public class TransferServiceApplication { @Bean(destroyMethod = "close") - KafkaProducer producer(TransferServiceProperties properties) + KafkaProducer producer(TransferServiceProperties properties) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); - props.put("schema.registry.url", properties.schemaRegistryUrl); - props.put("key.serializer", UUIDSerializer.class.getName()); - props.put("value.serializer", SpecificAvroSerializer.class.getName()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new KafkaProducer<>(props); } diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java index 8a50001..bea701f 100644 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java +++ b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java @@ -1,55 +1,17 @@ package de.juplo.kafka.payment.transfer; +import lombok.Getter; +import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; -@ConfigurationProperties("take-order") +@ConfigurationProperties("juplo.transfer") +@Getter +@Setter public class TransferServiceProperties { - String bootstrapServers = "kafka:9092"; - String schemaRegistryUrl = "http://schema-registry:8081"; - String topic = "orders"; - String path = "http://details:8092/orders/"; - - - public String getBootstrapServers() - { - return bootstrapServers; - } - - public void setBootstrapServers(String bootstrapServers) - { - this.bootstrapServers = bootstrapServers; - } - - public String getSchemaRegistryUrl() - { - return schemaRegistryUrl; - } - - public void setSchemaRegistryUrl(String schemaRegistryUrl) - { - this.schemaRegistryUrl = schemaRegistryUrl; - } - - public String getTopic() - { - return topic; - } - - public void setTopic(String topic) - { - this.topic = topic; - } - - public String getPath() - { - return path; - } - - public void setPath(String path) - { - this.path = path; - } + String bootstrapServers = "localhost:9092"; + String topic = "transfers"; + String path = "http://details:8092/transfers/"; } diff --git a/transfer/src/main/resources/transfer.avr b/transfer/src/main/resources/transfer.avr deleted file mode 100644 index 1aab7a2..0000000 --- a/transfer/src/main/resources/transfer.avr +++ /dev/null @@ -1,21 +0,0 @@ -[ - { - "namespace": "de.juplo.kafka.payment.avro", - "type": "enum", - "name": "TransferState", - "symbols" : [ "PENDING" ], - "default" : "PENDING" - }, - { - "namespace": "de.juplo.kafka.payment.avro", - "type": "record", - "name": "Transfer", - "fields": [ - { "name": "id", "type": "string" }, - { "name": "state", "type": "TransferState" }, - { "name": "payer", "type": "long" }, - { "name": "payee", "type": "long" }, - { "name": "amount", "type": "int" } - ] - } -]