-package de.juplo.kafka.payment.transfer;
-
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RestController;
-import org.springframework.web.context.request.async.DeferredResult;
-
-import javax.validation.Valid;
-import java.net.URI;
-import java.util.UUID;
-
-
-@RestController
-public class TransferService
-{
- private final static Logger LOG = LoggerFactory.getLogger(TransferService.class);
-
- private final KafkaProducer<UUID, String> producer;
- private final ObjectMapper mapper;
- private final String topic;
- private final String path;
-
-
- TransferService(
- final KafkaProducer<UUID, String> producer,
- final ObjectMapper mapper,
- final TransferServiceProperties properties)
- {
- this.producer = producer;
- this.mapper = mapper;
- this.topic = properties.getTopic();
- this.path = properties.getPath();
- }
-
-
- @PostMapping(
- path = "/transfer",
- consumes = MediaType.APPLICATION_JSON_VALUE,
- produces = MediaType.TEXT_PLAIN_VALUE)
- public DeferredResult<ResponseEntity<?>> transfer(@Valid @RequestBody Transfer transfer)
- {
- DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
-
- try
- {
- ProducerRecord<UUID, String> record =
- new ProducerRecord<>(
- topic,
- transfer.getId(),
- mapper.writeValueAsString(transfer));
-
- producer.send(record, (metadata, exception) ->
- {
- if (exception != null)
- {
- LOG.error("Could not place order {}: {}", transfer, exception.toString());
- result.setErrorResult(exception);
- return;
- }
-
- result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
- });
- }
- catch (Exception e)
- {
- LOG.error("Unexpected exception!", e);
- result.setErrorResult(e);
- }
-
- return result;
- }
-}