1 package de.juplo.kafka.payment.transfer.domain;
4 import com.fasterxml.jackson.core.JsonProcessingException;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.KafkaProducer;
9 import org.apache.kafka.clients.producer.ProducerRecord;
11 import java.util.Optional;
13 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
17 @RequiredArgsConstructor
18 public class TransferService
20 private final TransferRepository repository;
21 private final KafkaProducer<String, String> producer;
22 private final ObjectMapper mapper;
23 private final String topic;
25 public synchronized void initiate(Transfer transfer)
28 .get(transfer.getId())
32 if (!transfer.equals(stored))
33 throw new IllegalArgumentException(
34 "Re-Initiation of transfer with different data: old=" +
39 if (stored.getState() == FAILED)
41 repository.update(transfer.getId(), FAILED, SENT);
42 log.info("Resending faild transfer: " + stored);
49 transfer.setState(SENT);
50 repository.store(transfer);
55 private void send(Transfer transfer)
59 ProducerRecord<String, String> record =
62 Long.toString(transfer.getId()),
63 mapper.writeValueAsString(transfer));
65 producer.send(record, (metadata, exception) ->
69 log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
70 repository.update(transfer.getId(), SENT, PENDING);
74 log.error("Could not send {}: {}", transfer, exception.getMessage());
75 repository.update(transfer.getId(), SENT, FAILED);
79 catch (JsonProcessingException e)
81 throw new RuntimeException("Could not convert " + transfer, e);
85 public Optional<Transfer> get(Long id)
87 return repository.get(id);