From: Kai Moritz Date: Fri, 11 Jun 2021 11:17:02 +0000 (+0200) Subject: MVP for transfer service X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=6191849fee717b080118717c86df79fad12bafc8;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer MVP for transfer service --- 6191849fee717b080118717c86df79fad12bafc8 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..8c2c36b --- /dev/null +++ b/pom.xml @@ -0,0 +1,87 @@ + + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.5.1 + + + + de.juplo.kafka.payment + transfer + 1.0-SNAPSHOT + Transfer Service + An MVP for the Transfer Service + + + 11 + 6.2.0 + 2.8.0 + + + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-validation + + + jakarta.validation + jakarta.validation-api + + + + org.apache.kafka + kafka-clients + + + org.projectlombok + lombok + true + + + + org.springframework.boot + spring-boot-devtools + runtime + true + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + + + diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java new file mode 100644 index 0000000..65f683c --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -0,0 +1,59 @@ +package de.juplo.kafka.payment.transfer; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService; +import de.juplo.kafka.payment.transfer.domain.TransferService; +import de.juplo.kafka.payment.transfer.ports.MessagingService; +import de.juplo.kafka.payment.transfer.ports.TransferRepository; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + +import java.util.Properties; + + +@SpringBootApplication +@EnableConfigurationProperties(TransferServiceProperties.class) +@Slf4j +public class TransferServiceApplication +{ + @Bean(destroyMethod = "close") + KafkaProducer producer(TransferServiceProperties properties) + { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new KafkaProducer<>(props); + } + + @Bean + MessagingService kafkaMessagingService( + KafkaProducer producer, + ObjectMapper mapper, + TransferServiceProperties properties) + { + return new KafkaMessagingService(producer, mapper, properties.topic); + } + + @Bean + TransferService transferService( + TransferRepository repository, + MessagingService messagingService) + { + return new TransferService(repository, messagingService); + } + + + public static void main(String[] args) + { + SpringApplication.run(TransferServiceApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java new file mode 100644 index 0000000..ccd22a3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.payment.transfer; + + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.transfer") +@Getter +@Setter +public class TransferServiceProperties +{ + String bootstrapServers = "localhost:9092"; + String topic = "transfers"; +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java new file mode 100644 index 0000000..0f8cf2b --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java @@ -0,0 +1,59 @@ +package de.juplo.kafka.payment.transfer.adapter; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.ports.MessagingService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.util.concurrent.CompletableFuture; + + +@RequiredArgsConstructor +@Slf4j +public class KafkaMessagingService implements MessagingService +{ + private final KafkaProducer producer; + private final ObjectMapper mapper; + private final String topic; + + + @Override + public CompletableFuture send(Transfer transfer) + { + try + { + CompletableFuture future = new CompletableFuture<>(); + ProducerRecord record = + new ProducerRecord<>( + topic, + Long.toString(transfer.getId()), + mapper.writeValueAsString(transfer)); + + producer.send(record, (metadata, exception) -> + { + if (metadata != null) + { + log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset()); + future.complete(new TopicPartition(metadata.topic(), metadata.partition())); + } + else + { + log.error("Could not send {}: {}", transfer, exception.getMessage()); + future.completeExceptionally(exception); + } + }); + + return future; + } + catch (JsonProcessingException e) + { + throw new RuntimeException("Could not convert " + transfer, e); + } + } + +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java new file mode 100644 index 0000000..e20f9bf --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java @@ -0,0 +1,89 @@ +package de.juplo.kafka.payment.transfer.adapter; + + +import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.validation.FieldError; +import org.springframework.web.bind.MethodArgumentNotValidException; +import org.springframework.web.bind.annotation.*; + +import javax.servlet.http.HttpServletRequest; +import javax.validation.Valid; +import java.net.URI; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + + +@RestController +@RequiredArgsConstructor +@Slf4j + public class TransferController +{ + public final static String PATH = "/transfers"; + + private final InitiateTransferUseCase initiateTransferUseCase; + private final GetTransferUseCase getTransferUseCase; + + + @PostMapping( + path = PATH, + consumes = MediaType.APPLICATION_JSON_VALUE, + produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity transfer(@Valid @RequestBody TransferDTO transferDTO) + { + Transfer transfer = + Transfer + .builder() + .id(transferDTO.getId()) + .payer(transferDTO.getPayer()) + .payee(transferDTO.getPayee()) + .amount(transferDTO.getAmount()) + .build(); + + initiateTransferUseCase.initiate(transfer); + + return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build(); + } + + @GetMapping( + path = PATH + "/{id}", + produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity get(@PathVariable Long id) + { + return + getTransferUseCase + .get(id) + .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer))) + .orElse(ResponseEntity.notFound().build()); + } + + @ResponseStatus(HttpStatus.BAD_REQUEST) + @ExceptionHandler(MethodArgumentNotValidException.class) + public Map handleValidationExceptions( + HttpServletRequest request, + MethodArgumentNotValidException e) + { + Map errorAttributes = new HashMap<>(); + errorAttributes.put("status", HttpStatus.BAD_REQUEST.value()); + errorAttributes.put("error", HttpStatus.BAD_REQUEST.getReasonPhrase()); + errorAttributes.put("path", request.getRequestURI()); + errorAttributes.put("method", request.getMethod()); + errorAttributes.put("timestamp", new Date()); + Map errors = new HashMap<>(); + e.getBindingResult().getAllErrors().forEach((error) -> { + String fieldName = ((FieldError) error).getField(); + String errorMessage = error.getDefaultMessage(); + errors.put(fieldName, errorMessage); + }); + errorAttributes.put("errors", errors); + errorAttributes.put("message", "Validation failed: Invalid message format, error count: " + errors.size()); + return errorAttributes; + } +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferDTO.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferDTO.java new file mode 100644 index 0000000..1119e72 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferDTO.java @@ -0,0 +1,59 @@ +package de.juplo.kafka.payment.transfer.adapter; + +import de.juplo.kafka.payment.transfer.domain.Transfer; +import lombok.Builder; +import lombok.Data; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + + +/** + * Simple DTO used by the REST interface + */ +@Data +@Builder +public class TransferDTO +{ + @NotNull(message = "Cannot be null") + @Min(value = 1, message = "A valid transfer id must be a positive number") + private Long id; + @NotNull(message = "Cannot be null") + @Min(value = 1, message = "A valid bank account id must be a positive number") + private Long payer; + @NotNull(message = "Cannot be null") + @Min(value = 1, message = "A valid bank account id must be a positive number") + private Long payee; + @NotNull(message = "Cannot be null") + @Min(value = 1, message = "The amount of a transfer must be a positv value") + private Integer amount; + + private Transfer.State state; + + + public Transfer toTransfer() + { + return + Transfer + .builder() + .id(id) + .payer(payer) + .payee(payee) + .amount(amount) + .build(); + } + + + public static TransferDTO of(Transfer transfer) + { + return + TransferDTO + .builder() + .id(transfer.getId()) + .payer(transfer.getPayer()) + .payee(transfer.getPayee()) + .amount(transfer.getAmount()) + .state(transfer.getState()) + .build(); + } +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java new file mode 100644 index 0000000..5556a1b --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java @@ -0,0 +1,29 @@ +package de.juplo.kafka.payment.transfer.domain; + + +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; + + +@Data +@Builder +@EqualsAndHashCode(exclude = "state") +public class Transfer +{ + public enum State + { + SENT, + FAILED, + PENDING, + APPROVED, + REJECTED + } + + private final long id; + private final long payer; + private final long payee; + private final int amount; + + private State state; +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java new file mode 100644 index 0000000..3e6265f --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -0,0 +1,74 @@ +package de.juplo.kafka.payment.transfer.domain; + + +import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.MessagingService; +import de.juplo.kafka.payment.transfer.ports.TransferRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.Optional; + +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; + + +@Slf4j +@RequiredArgsConstructor +public class TransferService implements InitiateTransferUseCase, GetTransferUseCase +{ + private final TransferRepository repository; + private final MessagingService messagingService; + + public synchronized void initiate(Transfer transfer) + { + repository + .get(transfer.getId()) + .ifPresentOrElse( + stored -> + { + if (!transfer.equals(stored)) + throw new IllegalArgumentException( + "Re-Initiation of transfer with different data: old=" + + stored + + ", new=" + + transfer); + + if (stored.getState() == FAILED) + { + repository.update(transfer.getId(), FAILED, SENT); + log.info("Resending faild transfer: " + stored); + send(transfer); + } + }, + () -> + { + send(transfer); + transfer.setState(SENT); + repository.store(transfer); + }); + } + + private void send(Transfer transfer) + { + messagingService + .send(transfer) + .thenApply( + $ -> + { + repository.update(transfer.getId(), SENT, PENDING); + return null; + }) + .exceptionally( + e -> + { + repository.update(transfer.getId(), SENT, FAILED); + return null; + }); + } + + public Optional get(Long id) + { + return repository.get(id); + } +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java new file mode 100644 index 0000000..c5af531 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java @@ -0,0 +1,85 @@ +package de.juplo.kafka.payment.transfer.persistence; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.ports.TransferRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + + +@Component +@RequiredArgsConstructor +@Slf4j +public class InMemoryTransferRepository implements TransferRepository +{ + private final Map map = new HashMap<>(); + private final ObjectMapper mapper; + + + @Override + public synchronized void store(Transfer transfer) + { + Optional + .ofNullable(map.get(transfer.getId())) + .ifPresentOrElse( + json -> + { + throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer); + }, + () -> put(transfer)); + } + + private void put(Transfer transfer) + { + try + { + map.put(transfer.getId(), mapper.writeValueAsString(transfer)); + } + catch (JsonProcessingException e) + { + log.error("Could not convert Transfer.class: {}", transfer, e); + } + } + + @Override + public synchronized Optional get(Long id) + { + return + Optional + .ofNullable(map.get(id)) + .map(json -> { + try + { + return mapper.readValue(json, Transfer.class); + } + catch (JsonProcessingException e) + { + throw new RuntimeException("Could not convert JSON: " + json, e); + } + }); + } + + @Override + public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState) + { + Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id)); + + if (transfer.getState() != oldState) + throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState)); + + transfer.setState(newState); + put(transfer); + } + + @Override + public void remove(Long id) + { + map.remove(id); + } +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/GetTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/GetTransferUseCase.java new file mode 100644 index 0000000..3857a5f --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/GetTransferUseCase.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.payment.transfer.ports; + +import de.juplo.kafka.payment.transfer.domain.Transfer; + +import java.util.Optional; + + +public interface GetTransferUseCase +{ + Optional get(Long id); +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/InitiateTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/InitiateTransferUseCase.java new file mode 100644 index 0000000..b7dfc64 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/InitiateTransferUseCase.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.payment.transfer.ports; + +import de.juplo.kafka.payment.transfer.domain.Transfer; + + +public interface InitiateTransferUseCase +{ + void initiate(Transfer transfer); +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java new file mode 100644 index 0000000..81e7555 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.payment.transfer.ports; + +import de.juplo.kafka.payment.transfer.domain.Transfer; + +import java.util.concurrent.CompletableFuture; + + +public interface MessagingService +{ + CompletableFuture send(Transfer transfer); +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java new file mode 100644 index 0000000..2423ab3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.payment.transfer.ports; + +import de.juplo.kafka.payment.transfer.domain.Transfer; + +import java.util.Optional; + + +public interface TransferRepository +{ + void store(Transfer transfer); + + Optional get(Long id); + + void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException; + + void remove(Long id); +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties new file mode 100644 index 0000000..7fdbdce --- /dev/null +++ b/src/main/resources/application.properties @@ -0,0 +1,3 @@ +management.endpoints.web.exposure.include=* + +logging.level.de.juplo=debug diff --git a/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java b/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java new file mode 100644 index 0000000..39fb248 --- /dev/null +++ b/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.payment.transfer; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class TransferServiceApplicationTests +{ + @Test + void contextLoads() + { + } +} diff --git a/src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java b/src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java new file mode 100644 index 0000000..55f6c03 --- /dev/null +++ b/src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java @@ -0,0 +1,20 @@ +package de.juplo.kafka.payment.transfer.domain; + +import org.junit.jupiter.api.Test; + +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.PENDING; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.SENT; +import static org.assertj.core.api.Assertions.assertThat; + + +public class TransferTest +{ + @Test + public void testEqualsIgnoresState() + { + Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(SENT).build(); + Transfer b = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(PENDING).build(); + + assertThat(a).isEqualTo(b); + } +}