--- /dev/null
+package de.juplo.kafka.payment.transfer.api;
+
+import lombok.Data;
+
+import javax.validation.constraints.NotNull;
+import java.util.UUID;
+
+
+/**
+ * Simple DTO used by the REST interface
+ */
+@Data
+public class Transfer
+{
+ @NotNull(message = "Cannot be null")
+ private UUID id;
+ @NotNull(message = "Cannot be null")
+ private long payer;
+ @NotNull(message = "Cannot be null")
+ private long payee;
+ @NotNull(message = "Cannot be null")
+ private int amount;
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.api;
+
+public interface TransferService
+{
+ void handleTransfer(Transfer transfer);
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.controller;
+
+
+import de.juplo.kafka.payment.transfer.api.Transfer;
+import de.juplo.kafka.payment.transfer.impl.TransferServiceImpl;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.validation.Valid;
+import java.net.URI;
+
+
+@RestController
+@RequiredArgsConstructor
+@Slf4j
+public class TransferController
+{
+ public final static String PATH = "/transfers";
+
+ private final TransferServiceImpl service;
+
+
+ @PostMapping(
+ path = PATH,
+ consumes = MediaType.APPLICATION_JSON_VALUE,
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity<?> transfer(@Valid @RequestBody Transfer transfer)
+ {
+ service.initiate(transfer);
+ return ResponseEntity.created(URI.create(PATH + transfer.getId())).build();
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.impl;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.api.Transfer;
+import de.juplo.kafka.payment.transfer.api.TransferService;
+import de.juplo.kafka.payment.transfer.persistence.TransferRepository;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import javax.validation.Valid;
+import java.net.URI;
+import java.util.UUID;
+
+
+@Slf4j
+@RequiredArgsConstructor
+public class TransferServiceImpl implements TransferService
+{
+ private final TransferRepository repository;
+ private final KafkaProducer<UUID, String> producer;
+ private final ObjectMapper mapper;
+ private final String topic;
+
+
+ @Override
+ public void initiate(Transfer transfer)
+ {
+ repository
+ .get(transfer.getId())
+ .ifPresentOrElse(
+ stored ->
+ {
+
+ },
+ () ->
+ {
+
+ });
+ try
+ {
+ ProducerRecord<UUID, String> record =
+ new ProducerRecord<>(
+ topic,
+ transfer.getId(),
+ mapper.writeValueAsString(transfer));
+
+ producer.send(record, (metadata, exception) ->
+ {
+ if (exception != null)
+ {
+ log.error("Could not place order {}: {}", transfer, exception.toString());
+ result.setErrorResult(exception);
+ return;
+ }
+
+ result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
+ });
+ }
+ catch (Exception e)
+ {
+ log.error("Unexpected exception!", e);
+ result.setErrorResult(e);
+ }
+
+ return result;
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.persistence;
+
+import de.juplo.kafka.payment.transfer.api.Transfer;
+
+import java.util.Optional;
+import java.util.UUID;
+
+
+public interface TransferRepository
+{
+ void put(Transfer transfer);
+ Optional<Transfer> get(UUID uuid);
+}