-package de.juplo.kafka.payment.transfer.domain;
-
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import java.util.Optional;
-
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
-
-
-@Slf4j
-@RequiredArgsConstructor
-public class TransferService
-{
- private final TransferRepository repository;
- private final KafkaProducer<String, String> producer;
- private final ObjectMapper mapper;
- private final String topic;
-
- public synchronized void initiate(Transfer transfer)
- {
- repository
- .get(transfer.getId())
- .ifPresentOrElse(
- stored ->
- {
- if (!transfer.equals(stored))
- throw new IllegalArgumentException(
- "Re-Initiation of transfer with different data: old=" +
- stored +
- ", new=" +
- transfer);
-
- if (stored.getState() == FAILED)
- {
- repository.update(transfer.getId(), FAILED, SENT);
- log.info("Resending faild transfer: " + stored);
- send(transfer);
- }
- },
- () ->
- {
- send(transfer);
- transfer.setState(SENT);
- repository.store(transfer);
- });
- }
-
-
- private void send(Transfer transfer)
- {
- try
- {
- ProducerRecord<String, String> record =
- new ProducerRecord<>(
- topic,
- Long.toString(transfer.getId()),
- mapper.writeValueAsString(transfer));
-
- producer.send(record, (metadata, exception) ->
- {
- if (metadata != null)
- {
- log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
- repository.update(transfer.getId(), SENT, PENDING);
- }
- else
- {
- log.error("Could not send {}: {}", transfer, exception.getMessage());
- repository.update(transfer.getId(), SENT, FAILED);
- }
- });
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException("Could not convert " + transfer, e);
- }
- }
-
- public Optional<Transfer> get(Long id)
- {
- return repository.get(id);
- }
-}