WIP
[demos/kafka/demos-kafka-payment-system-transfer] / transfer / src / main / java / de / juplo / kafka / payment / transfer / TransferService.java
index 15550af..9e72af1 100644 (file)
@@ -1,11 +1,7 @@
 package de.juplo.kafka.payment.transfer;
 
 
-import java.net.URI;
-import java.util.UUID;
-
-import de.juplo.kafka.payment.avro.Order;
-import de.juplo.kafka.payment.avro.OrderState;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
@@ -18,6 +14,8 @@ import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.validation.Valid;
+import java.net.URI;
+import java.util.UUID;
 
 
 @RestController
@@ -25,56 +23,50 @@ public class TransferService
 {
   private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
 
-  private final KafkaProducer<UUID, Order> producer;
+  private final KafkaProducer<UUID, String> producer;
+  private final ObjectMapper mapper;
   private final String topic;
   private final String path;
 
 
   TransferService(
-      final KafkaProducer<UUID,Order> producer,
+      final KafkaProducer<UUID, String> producer,
+      final ObjectMapper mapper,
       final TransferServiceProperties properties)
   {
     this.producer = producer;
+    this.mapper = mapper;
     this.topic = properties.getTopic();
     this.path = properties.getPath();
   }
 
 
   @PostMapping(
-      path = "/orders",
-      consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,
+      path = "/transfer",
+      consumes = MediaType.APPLICATION_JSON_VALUE,
       produces = MediaType.TEXT_PLAIN_VALUE)
-  public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody Transfer order)
+  public DeferredResult<ResponseEntity<?>> transfer(@Valid @RequestBody Transfer transfer)
   {
     DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
 
     try
     {
-      UUID uuid = UUID.randomUUID();
-      ProducerRecord<UUID, Order> record =
+      ProducerRecord<UUID, String> record =
           new ProducerRecord<>(
-              topic, 
-              uuid,
-              Order
-                  .newBuilder()
-                  .setId(uuid.toString())
-                  .setState(OrderState.CREATED)
-                  .setCustomerId(order.getCustomerId())
-                  .setOrderId(order.getId())
-                  .setProductId(order.getProductId())
-                  .setQuantity(order.getQuantity())
-                  .build());
+              topic,
+              transfer.getId(),
+              mapper.writeValueAsString(transfer));
 
       producer.send(record, (metadata, exception) ->
       {
         if (exception != null)
         {
-          LOG.error("Could not place order {}: {}", order, exception.toString());
+          LOG.error("Could not place order {}: {}", transfer, exception.toString());
           result.setErrorResult(exception);
           return;
         }
 
-        result.setResult(ResponseEntity.created(URI.create(path + uuid)).build());
+        result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
       });
     }
     catch (Exception e)