-package de.juplo.kafka.payment.transfer.impl;
-
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.payment.transfer.api.Transfer;
-import de.juplo.kafka.payment.transfer.api.TransferService;
-import de.juplo.kafka.payment.transfer.persistence.TransferRepository;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.context.request.async.DeferredResult;
-
-import javax.validation.Valid;
-import java.net.URI;
-import java.util.UUID;
-
-
-@Slf4j
-@RequiredArgsConstructor
-public class TransferServiceImpl implements TransferService
-{
- private final TransferRepository repository;
- private final KafkaProducer<UUID, String> producer;
- private final ObjectMapper mapper;
- private final String topic;
-
-
- @Override
- public void initiate(Transfer transfer)
- {
- repository
- .get(transfer.getId())
- .ifPresentOrElse(
- stored ->
- {
-
- },
- () ->
- {
-
- });
- try
- {
- ProducerRecord<UUID, String> record =
- new ProducerRecord<>(
- topic,
- transfer.getId(),
- mapper.writeValueAsString(transfer));
-
- producer.send(record, (metadata, exception) ->
- {
- if (exception != null)
- {
- log.error("Could not place order {}: {}", transfer, exception.toString());
- result.setErrorResult(exception);
- return;
- }
-
- result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
- });
- }
- catch (Exception e)
- {
- log.error("Unexpected exception!", e);
- result.setErrorResult(e);
- }
-
- return result;
- }
-}