package de.juplo.kafka.payment.transfer;
-import java.net.URI;
-import java.util.UUID;
-
-import de.juplo.kafka.payment.avro.Order;
-import de.juplo.kafka.payment.avro.OrderState;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.springframework.web.context.request.async.DeferredResult;
import javax.validation.Valid;
+import java.net.URI;
+import java.util.UUID;
@RestController
{
private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
- private final KafkaProducer<UUID, TransferBean> producer;
+ private final KafkaProducer<UUID, String> producer;
+ private final ObjectMapper mapper;
private final String topic;
private final String path;
TransferService(
- final KafkaProducer<UUID, TransferBean> producer,
+ final KafkaProducer<UUID, String> producer,
+ final ObjectMapper mapper,
final TransferServiceProperties properties)
{
this.producer = producer;
+ this.mapper = mapper;
this.topic = properties.getTopic();
this.path = properties.getPath();
}
@PostMapping(
- path = "/orders",
- consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,
+ path = "/transfer",
+ consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.TEXT_PLAIN_VALUE)
- public DeferredResult<ResponseEntity<?>> placeOrder(@Valid @RequestBody TransferBean transfer)
+ public DeferredResult<ResponseEntity<?>> transfer(@Valid @RequestBody Transfer transfer)
{
DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
try
{
- UUID uuid = UUID.randomUUID();
- ProducerRecord<UUID, TransferBean> record =
+ ProducerRecord<UUID, String> record =
new ProducerRecord<>(
- topic,
- uuid,
- Transfer
- .new
- .id(transfer.getId().toString())
- .setState(TransferState.CREATED)
- .setCustomerId(transfer.getCustomerId())
- .setOrderId(transfer.getId())
- .setProductId(transfer.getProductId())
- .setQuantity(transfer.getQuantity())
- .build());
+ topic,
+ transfer.getId(),
+ mapper.writeValueAsString(transfer));
producer.send(record, (metadata, exception) ->
{
return;
}
- result.setResult(ResponseEntity.created(URI.create(path + uuid)).build());
+ result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
});
}
catch (Exception e)