import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.payment.transfer.impl.TransferServiceImpl;
+import de.juplo.kafka.payment.transfer.domain.TransferService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
}
@Bean
- TransferServiceImpl transferService(
+ TransferService transferService(
KafkaProducer<UUID, String> producer,
ObjectMapper mapper,
TransferServiceProperties properties)
{
- return new TransferServiceImpl(producer, mapper, properties.topic);
+ return new TransferService(producer, mapper, properties.topic);
}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.api;
-
-import lombok.Data;
-
-import javax.validation.constraints.NotNull;
-import java.util.UUID;
-
-
-/**
- * Simple DTO used by the REST interface
- */
-@Data
-public class Transfer
-{
- @NotNull(message = "Cannot be null")
- private UUID id;
- @NotNull(message = "Cannot be null")
- private long payer;
- @NotNull(message = "Cannot be null")
- private long payee;
- @NotNull(message = "Cannot be null")
- private int amount;
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.api;
-
-public interface TransferService
-{
- void handleTransfer(Transfer transfer);
-}
package de.juplo.kafka.payment.transfer.controller;
-import de.juplo.kafka.payment.transfer.api.Transfer;
-import de.juplo.kafka.payment.transfer.impl.TransferServiceImpl;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.domain.TransferService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
{
public final static String PATH = "/transfers";
- private final TransferServiceImpl service;
+ private final TransferService service;
@PostMapping(
path = PATH,
consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE)
- public ResponseEntity<?> transfer(@Valid @RequestBody Transfer transfer)
+ public ResponseEntity<?> transfer(@Valid @RequestBody TransferRequest transferRequest)
{
- service.initiate(transfer);
- return ResponseEntity.created(URI.create(PATH + transfer.getId())).build();
+ service.initiate(transferRequest);
+ return ResponseEntity.created(URI.create(PATH + transferRequest.getId())).build();
}
}
--- /dev/null
+package de.juplo.kafka.payment.transfer.controller;
+
+import lombok.Data;
+
+import javax.validation.constraints.NotNull;
+import java.util.UUID;
+
+
+/**
+ * Simple DTO used by the REST interface
+ */
+@Data
+public class TransferRequest
+{
+ @NotNull(message = "Cannot be null")
+ private UUID id;
+ @NotNull(message = "Cannot be null")
+ private long payer;
+ @NotNull(message = "Cannot be null")
+ private long payee;
+ @NotNull(message = "Cannot be null")
+ private int amount;
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.domain;
+
+
+import lombok.Data;
+
+
+@Data
+public class Transfer
+{
+ public enum State
+ {
+ PENDING,
+ APPROVED,
+ REJECTED
+ }
+
+ private final long id;
+ private final long payer;
+ private final long payee;
+ private final int amount;
+
+ private State state;
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.domain;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+
+import java.util.Optional;
+import java.util.UUID;
+
+
+public interface TransferRepository
+{
+ void put(Transfer transfer);
+ Optional<Transfer> get(UUID uuid);
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.domain;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.http.ResponseEntity;
+
+import java.net.URI;
+import java.util.UUID;
+
+
+@Slf4j
+@RequiredArgsConstructor
+public class TransferService
+{
+ private final TransferRepository repository;
+ private final KafkaProducer<UUID, String> producer;
+ private final ObjectMapper mapper;
+ private final String topic;
+
+ public void initiate(Transfer transfer)
+ {
+ repository
+ .get(transfer.getId())
+ .ifPresentOrElse(
+ stored ->
+ {
+
+ },
+ () ->
+ {
+
+ });
+ try
+ {
+ ProducerRecord<UUID, String> record =
+ new ProducerRecord<>(
+ topic,
+ transfer.getId(),
+ mapper.writeValueAsString(transfer));
+
+ producer.send(record, (metadata, exception) ->
+ {
+ if (exception != null)
+ {
+ log.error("Could not place order {}: {}", transfer, exception.toString());
+ result.setErrorResult(exception);
+ return;
+ }
+
+ result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
+ });
+ }
+ catch (Exception e)
+ {
+ log.error("Unexpected exception!", e);
+ result.setErrorResult(e);
+ }
+
+ return result;
+ }
+}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.impl;
-
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.payment.transfer.api.Transfer;
-import de.juplo.kafka.payment.transfer.api.TransferService;
-import de.juplo.kafka.payment.transfer.persistence.TransferRepository;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.context.request.async.DeferredResult;
-
-import javax.validation.Valid;
-import java.net.URI;
-import java.util.UUID;
-
-
-@Slf4j
-@RequiredArgsConstructor
-public class TransferServiceImpl implements TransferService
-{
- private final TransferRepository repository;
- private final KafkaProducer<UUID, String> producer;
- private final ObjectMapper mapper;
- private final String topic;
-
-
- @Override
- public void initiate(Transfer transfer)
- {
- repository
- .get(transfer.getId())
- .ifPresentOrElse(
- stored ->
- {
-
- },
- () ->
- {
-
- });
- try
- {
- ProducerRecord<UUID, String> record =
- new ProducerRecord<>(
- topic,
- transfer.getId(),
- mapper.writeValueAsString(transfer));
-
- producer.send(record, (metadata, exception) ->
- {
- if (exception != null)
- {
- log.error("Could not place order {}: {}", transfer, exception.toString());
- result.setErrorResult(exception);
- return;
- }
-
- result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
- });
- }
- catch (Exception e)
- {
- log.error("Unexpected exception!", e);
- result.setErrorResult(e);
- }
-
- return result;
- }
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.persistence;
-
-import de.juplo.kafka.payment.transfer.api.Transfer;
-
-import java.util.Optional;
-import java.util.UUID;
-
-
-public interface TransferRepository
-{
- void put(Transfer transfer);
- Optional<Transfer> get(UUID uuid);
-}