+++ /dev/null
-[
- {
- "namespace": "de.juplo.kafka.payment.avro",
- "type": "enum",
- "name": "OrderState",
- "symbols" : [ "CREATED" ],
- "default" : "CREATED"
- },
- {
- "namespace": "de.juplo.kafka.payment.avro",
- "type": "record",
- "name": "Order",
- "fields": [
- { "name": "id", "type": "string" },
- { "name": "state", "type": "OrderState" },
- { "name": "customerId", "type": "long" },
- { "name": "orderId", "type": "long" },
- { "name": "productId", "type": "long" },
- { "name": "quantity", "type": "int" }
- ]
- }
-]
<goal>schema</goal>
</goals>
<configuration>
- <sourceDirectory>${project.basedir}/</sourceDirectory>
+ <sourceDirectory>${project.basedir}/src/main/resources</sourceDirectory>
<outputDirectory>${project.basedir}/target/generated-sources/</outputDirectory>
</configuration>
</execution>
{
private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
- private final KafkaProducer<UUID, Order> producer;
+ private final KafkaProducer<UUID, TransferBean> producer;
private final String topic;
private final String path;
TransferService(
- final KafkaProducer<UUID,Order> producer,
+ final KafkaProducer<UUID, TransferBean> producer,
final TransferServiceProperties properties)
{
this.producer = producer;
path = "/orders",
consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,
produces = MediaType.TEXT_PLAIN_VALUE)
- public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody Transfer order)
+ public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody TransferBean transfer)
{
DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
try
{
UUID uuid = UUID.randomUUID();
- ProducerRecord<UUID, Order> record =
+ ProducerRecord<UUID, TransferBean> record =
new ProducerRecord<>(
topic,
uuid,
- Order
- .newBuilder()
- .setId(uuid.toString())
- .setState(OrderState.CREATED)
- .setCustomerId(order.getCustomerId())
- .setOrderId(order.getId())
- .setProductId(order.getProductId())
- .setQuantity(order.getQuantity())
+ Transfer
+ .new
+ .id(transfer.getId().toString())
+ .setState(TransferState.CREATED)
+ .setCustomerId(transfer.getCustomerId())
+ .setOrderId(transfer.getId())
+ .setProductId(transfer.getProductId())
+ .setQuantity(transfer.getQuantity())
.build());
producer.send(record, (metadata, exception) ->
{
if (exception != null)
{
- LOG.error("Could not place order {}: {}", order, exception.toString());
+ LOG.error("Could not place order {}: {}", transfer, exception.toString());
result.setErrorResult(exception);
return;
}
import java.util.Properties;
import java.util.UUID;
-import de.juplo.kafka.payment.avro.Order;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.UUIDSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
public class TransferServiceApplication
{
@Bean(destroyMethod = "close")
- KafkaProducer<UUID, Order> producer(TransferServiceProperties properties)
+ KafkaProducer<UUID, TransferBean> producer(TransferServiceProperties properties)
{
Properties props = new Properties();
- props.put("bootstrap.servers", properties.bootstrapServers);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
props.put("schema.registry.url", properties.schemaRegistryUrl);
props.put("key.serializer", UUIDSerializer.class.getName());
props.put("value.serializer", SpecificAvroSerializer.class.getName());
--- /dev/null
+[
+ {
+ "namespace": "de.juplo.kafka.payment.avro",
+ "type": "enum",
+ "name": "TransferState",
+ "symbols" : [ "PENDING" ],
+ "default" : "PENDING"
+ },
+ {
+ "namespace": "de.juplo.kafka.payment.avro",
+ "type": "record",
+ "name": "Transfer",
+ "fields": [
+ { "name": "id", "type": "string" },
+ { "name": "state", "type": "TransferState" },
+ { "name": "payer", "type": "long" },
+ { "name": "payee", "type": "long" },
+ { "name": "amount", "type": "int" }
+ ]
+ }
+]