depends_on:
- zookeeper
- schema-registry:
- image: confluentinc/cp-schema-registry:6.2.0
- hostname: schema-registry
- ports:
- - "8081:8081"
- environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9093
- SCHEMA_REGISTRY_HOST_NAME: schema-registry
- SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
- depends_on:
- - zookeeper
- - kafka
-
- transfer
+ transfer:
image: juplo/transfer-service:mvp
ports:
- "8091:8080"
<description>An MVP for the Transfer Service</description>
<properties>
<java.version>11</java.version>
- <avro.version>1.10.2</avro.version>
<confluent.version>6.2.0</confluent.version>
<kafka.version>2.8.0</kafka.version>
</properties>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>${avro.version}</version>
- </dependency>
- <dependency>
- <groupId>io.confluent</groupId>
- <artifactId>kafka-streams-avro-serde</artifactId>
- <version>${confluent.version}</version>
- </dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
</dependency>
</dependencies>
- <repositories>
- <repository>
- <id>confluent</id>
- <url>https://packages.confluent.io/maven/</url>
- </repository>
- </repositories>
-
<build>
<plugins>
<plugin>
</excludes>
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-maven-plugin</artifactId>
- <version>${avro.version}</version>
- <executions>
- <execution>
- <phase>generate-sources</phase>
- <goals>
- <goal>schema</goal>
- </goals>
- <configuration>
- <sourceDirectory>${project.basedir}/src/main/resources</sourceDirectory>
- <outputDirectory>${project.basedir}/target/generated-sources/</outputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
+++ /dev/null
-package de.juplo.kafka.payment.transfer;
-
-import lombok.Builder;
-import lombok.Data;
-
-import javax.validation.constraints.NotNull;
-import java.util.UUID;
-
-
-/**
- * Simple DTO used by the REST interface
- */
-@Data
-@Builder
-public class TransferBean
-{
- @NotNull(message = "Cannot be null")
- private UUID id;
- @NotNull(message = "Cannot be null")
- private long payer;
- @NotNull(message = "Cannot be null")
- private long payee;
- @NotNull(message = "Cannot be null")
- private int amount;
-}
package de.juplo.kafka.payment.transfer;
-import java.net.URI;
-import java.util.UUID;
-
-import de.juplo.kafka.payment.avro.Order;
-import de.juplo.kafka.payment.avro.OrderState;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.springframework.web.context.request.async.DeferredResult;
import javax.validation.Valid;
+import java.net.URI;
+import java.util.UUID;
@RestController
{
private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
- private final KafkaProducer<UUID, TransferBean> producer;
+ private final KafkaProducer<UUID, String> producer;
+ private final ObjectMapper mapper;
private final String topic;
private final String path;
TransferService(
- final KafkaProducer<UUID, TransferBean> producer,
+ final KafkaProducer<UUID, String> producer,
+ final ObjectMapper mapper,
final TransferServiceProperties properties)
{
this.producer = producer;
+ this.mapper = mapper;
this.topic = properties.getTopic();
this.path = properties.getPath();
}
@PostMapping(
- path = "/orders",
- consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,
+ path = "/transfer",
+ consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.TEXT_PLAIN_VALUE)
- public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody TransferBean transfer)
+ public DeferredResult<ResponseEntity<?>> transfer(@Valid @RequestBody Transfer transfer)
{
DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
try
{
- UUID uuid = UUID.randomUUID();
- ProducerRecord<UUID, TransferBean> record =
+ ProducerRecord<UUID, String> record =
new ProducerRecord<>(
- topic,
- uuid,
- Transfer
- .new
- .id(transfer.getId().toString())
- .setState(TransferState.CREATED)
- .setCustomerId(transfer.getCustomerId())
- .setOrderId(transfer.getId())
- .setProductId(transfer.getProductId())
- .setQuantity(transfer.getQuantity())
- .build());
+ topic,
+ transfer.getId(),
+ mapper.writeValueAsString(transfer));
producer.send(record, (metadata, exception) ->
{
return;
}
- result.setResult(ResponseEntity.created(URI.create(path + uuid)).build());
+ result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
});
}
catch (Exception e)
package de.juplo.kafka.payment.transfer;
-import java.util.Properties;
-import java.util.UUID;
-
-import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.UUIDSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
+import java.util.Properties;
+import java.util.UUID;
+
@SpringBootApplication
@EnableConfigurationProperties(TransferServiceProperties.class)
public class TransferServiceApplication
{
@Bean(destroyMethod = "close")
- KafkaProducer<UUID, TransferBean> producer(TransferServiceProperties properties)
+ KafkaProducer<UUID, String> producer(TransferServiceProperties properties)
{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
- props.put("schema.registry.url", properties.schemaRegistryUrl);
- props.put("key.serializer", UUIDSerializer.class.getName());
- props.put("value.serializer", SpecificAvroSerializer.class.getName());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaProducer<>(props);
}
package de.juplo.kafka.payment.transfer;
+import lombok.Getter;
+import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
-@ConfigurationProperties("take-order")
+@ConfigurationProperties("juplo.transfer")
+@Getter
+@Setter
public class TransferServiceProperties
{
- String bootstrapServers = "kafka:9092";
- String schemaRegistryUrl = "http://schema-registry:8081";
- String topic = "orders";
- String path = "http://details:8092/orders/";
-
-
- public String getBootstrapServers()
- {
- return bootstrapServers;
- }
-
- public void setBootstrapServers(String bootstrapServers)
- {
- this.bootstrapServers = bootstrapServers;
- }
-
- public String getSchemaRegistryUrl()
- {
- return schemaRegistryUrl;
- }
-
- public void setSchemaRegistryUrl(String schemaRegistryUrl)
- {
- this.schemaRegistryUrl = schemaRegistryUrl;
- }
-
- public String getTopic()
- {
- return topic;
- }
-
- public void setTopic(String topic)
- {
- this.topic = topic;
- }
-
- public String getPath()
- {
- return path;
- }
-
- public void setPath(String path)
- {
- this.path = path;
- }
+ String bootstrapServers = "localhost:9092";
+ String topic = "transfers";
+ String path = "http://details:8092/transfers/";
}
+++ /dev/null
-[
- {
- "namespace": "de.juplo.kafka.payment.avro",
- "type": "enum",
- "name": "TransferState",
- "symbols" : [ "PENDING" ],
- "default" : "PENDING"
- },
- {
- "namespace": "de.juplo.kafka.payment.avro",
- "type": "record",
- "name": "Transfer",
- "fields": [
- { "name": "id", "type": "string" },
- { "name": "state", "type": "TransferState" },
- { "name": "payer", "type": "long" },
- { "name": "payee", "type": "long" },
- { "name": "amount", "type": "int" }
- ]
- }
-]