import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.TransferRepository;
import de.juplo.kafka.payment.transfer.domain.TransferService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
@Bean
TransferService transferService(
+ TransferRepository repository,
KafkaProducer<UUID, String> producer,
ObjectMapper mapper,
TransferServiceProperties properties)
{
- return new TransferService(producer, mapper, properties.topic);
+ return new TransferService(repository, producer, mapper, properties.topic);
}
produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<?> transfer(@Valid @RequestBody TransferRequest transferRequest)
{
- service.initiate(transferRequest);
+ Transfer transfer =
+ Transfer
+ .builder()
+ .id(transferRequest.getId())
+ .payer(transferRequest.getPayer())
+ .payee(transferRequest.getPayee())
+ .amount(transferRequest.getAmount())
+ .build();
+
+ service.initiate(transfer);
+
return ResponseEntity.created(URI.create(PATH + transferRequest.getId())).build();
}
}
import lombok.Data;
import javax.validation.constraints.NotNull;
-import java.util.UUID;
/**
public class TransferRequest
{
@NotNull(message = "Cannot be null")
- private UUID id;
+ private long id;
@NotNull(message = "Cannot be null")
private long payer;
@NotNull(message = "Cannot be null")
package de.juplo.kafka.payment.transfer.domain;
+import lombok.Builder;
import lombok.Data;
@Data
+@Builder
public class Transfer
{
public enum State
{
+ CREATED,
+ FAILED,
PENDING,
APPROVED,
REJECTED
public interface TransferRepository
{
void put(Transfer transfer);
- Optional<Transfer> get(UUID uuid);
+ Optional<Transfer> get(Long id);
}
private final ObjectMapper mapper;
private final String topic;
- public void initiate(Transfer transfer)
+ public synchronized void initiate(Transfer transfer)
{
repository
.get(transfer.getId())
.ifPresentOrElse(
stored ->
{
-
+ switch (stored.getState())
+ {
+ case FAILED:
+ }
},
() ->
{
-
});
+ }
+
+
+ private void create(Transfer transfer)
+ {
try
{
ProducerRecord<UUID, String> record =