From: Kai Moritz Date: Sun, 13 Jun 2021 08:48:28 +0000 (+0200) Subject: WIP X-Git-Tag: wip-initialer-commit X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=7625c79fc69130bc68f906e4d46682d1d0ad4663;p=demos%2Fkafka%2Fdemos-kafka-payment-system-setup WIP --- diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..cd6a95b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,4 @@ +FROM openjdk:8-jre-slim +COPY target/take-order-01-0-SNAPSHOT.jar /opt/ +EXPOSE 8080 +CMD ["java", "-jar", "/opt/take-order-01-0-SNAPSHOT.jar"] diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..8a26995 --- /dev/null +++ b/pom.xml @@ -0,0 +1,78 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.5.1 + + + de.juplo.kafka.payment + transfer + 1.0-SNAPSHOT + Transfer Service + An MVP for the Transfer Service + + 11 + 6.2.0 + 2.8.0 + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-validation + + + + org.apache.kafka + kafka-clients + + + org.springframework.boot + spring-boot-devtools + runtime + true + + + jakarta.validation + jakarta.validation-api + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + + + diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java new file mode 100644 index 0000000..320b841 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -0,0 +1,51 @@ +package de.juplo.kafka.payment.transfer; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.domain.TransferRepository; +import de.juplo.kafka.payment.transfer.domain.TransferService; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + +import java.util.Properties; +import java.util.UUID; + + +@SpringBootApplication +@EnableConfigurationProperties(TransferServiceProperties.class) +@Slf4j +public class TransferServiceApplication +{ + @Bean(destroyMethod = "close") + KafkaProducer producer(TransferServiceProperties properties) + { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new KafkaProducer<>(props); + } + + @Bean + TransferService transferService( + TransferRepository repository, + KafkaProducer producer, + ObjectMapper mapper, + TransferServiceProperties properties) + { + return new TransferService(repository, producer, mapper, properties.topic); + } + + + public static void main(String[] args) + { + SpringApplication.run(TransferServiceApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java new file mode 100644 index 0000000..ccd22a3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.payment.transfer; + + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.transfer") +@Getter +@Setter +public class TransferServiceProperties +{ + String bootstrapServers = "localhost:9092"; + String topic = "transfers"; +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java b/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java new file mode 100644 index 0000000..2d83ad9 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java @@ -0,0 +1,88 @@ +package de.juplo.kafka.payment.transfer.controller; + + +import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.domain.TransferService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.validation.FieldError; +import org.springframework.web.bind.MethodArgumentNotValidException; +import org.springframework.web.bind.annotation.*; + +import javax.servlet.http.HttpServletRequest; +import javax.validation.Valid; +import java.net.URI; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + + +@RestController +@RequiredArgsConstructor +@Slf4j + public class TransferController +{ + public final static String PATH = "/transfers"; + + private final TransferService service; + + + @PostMapping( + path = PATH, + consumes = MediaType.APPLICATION_JSON_VALUE, + produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity transfer(@Valid @RequestBody TransferDTO transferDTO) + { + Transfer transfer = + Transfer + .builder() + .id(transferDTO.getId()) + .payer(transferDTO.getPayer()) + .payee(transferDTO.getPayee()) + .amount(transferDTO.getAmount()) + .build(); + + service.initiate(transfer); + + return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build(); + } + + @GetMapping( + path = PATH + "/{id}", + produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity get(@PathVariable Long id) + { + return + service + .get(id) + .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer))) + .orElse(ResponseEntity.notFound().build()); + } + + @ResponseStatus(HttpStatus.BAD_REQUEST) + @ExceptionHandler(MethodArgumentNotValidException.class) + public Map handleValidationExceptions( + HttpServletRequest request, + MethodArgumentNotValidException e) + { + Map errorAttributes = new HashMap<>(); + errorAttributes.put("status", HttpStatus.BAD_REQUEST.value()); + errorAttributes.put("error", HttpStatus.BAD_REQUEST.getReasonPhrase()); + errorAttributes.put("path", request.getRequestURI()); + errorAttributes.put("method", request.getMethod()); + errorAttributes.put("timestamp", new Date()); + Map errors = new HashMap<>(); + e.getBindingResult().getAllErrors().forEach((error) -> { + String fieldName = ((FieldError) error).getField(); + String errorMessage = error.getDefaultMessage(); + errors.put(fieldName, errorMessage); + }); + errorAttributes.put("errors", errors); + errorAttributes.put("message", "Validation failed: Invalid message format, error count: " + errors.size()); + return errorAttributes; + } +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java b/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java new file mode 100644 index 0000000..ad4f57d --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java @@ -0,0 +1,59 @@ +package de.juplo.kafka.payment.transfer.controller; + +import de.juplo.kafka.payment.transfer.domain.Transfer; +import lombok.Builder; +import lombok.Data; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + + +/** + * Simple DTO used by the REST interface + */ +@Data +@Builder +public class TransferDTO +{ + @NotNull(message = "Cannot be null") + @Min(value = 1, message = "A valid transfer id must be a positive number") + private Long id; + @NotNull(message = "Cannot be null") + @Min(value = 1, message = "A valid bank account id must be a positive number") + private Long payer; + @NotNull(message = "Cannot be null") + @Min(value = 1, message = "A valid bank account id must be a positive number") + private Long payee; + @NotNull(message = "Cannot be null") + @Min(value = 1, message = "The amount of a transfer must be a positv value") + private Integer amount; + + private Transfer.State state; + + + public Transfer toTransfer() + { + return + Transfer + .builder() + .id(id) + .payer(payer) + .payee(payee) + .amount(amount) + .build(); + } + + + public static TransferDTO of(Transfer transfer) + { + return + TransferDTO + .builder() + .id(transfer.getId()) + .payer(transfer.getPayer()) + .payee(transfer.getPayee()) + .amount(transfer.getAmount()) + .state(transfer.getState()) + .build(); + } +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java new file mode 100644 index 0000000..5556a1b --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java @@ -0,0 +1,29 @@ +package de.juplo.kafka.payment.transfer.domain; + + +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; + + +@Data +@Builder +@EqualsAndHashCode(exclude = "state") +public class Transfer +{ + public enum State + { + SENT, + FAILED, + PENDING, + APPROVED, + REJECTED + } + + private final long id; + private final long payer; + private final long payee; + private final int amount; + + private State state; +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java new file mode 100644 index 0000000..36d027c --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.payment.transfer.domain; + +import java.util.Optional; + + +public interface TransferRepository +{ + void store(Transfer transfer); + + Optional get(Long id); + + void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException; + + void remove(Long id); +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java new file mode 100644 index 0000000..d708826 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -0,0 +1,89 @@ +package de.juplo.kafka.payment.transfer.domain; + + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Optional; + +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; + + +@Slf4j +@RequiredArgsConstructor +public class TransferService +{ + private final TransferRepository repository; + private final KafkaProducer producer; + private final ObjectMapper mapper; + private final String topic; + + public synchronized void initiate(Transfer transfer) + { + repository + .get(transfer.getId()) + .ifPresentOrElse( + stored -> + { + if (!transfer.equals(stored)) + throw new IllegalArgumentException( + "Re-Initiation of transfer with different data: old=" + + stored + + ", new=" + + transfer); + + if (stored.getState() == FAILED) + { + repository.update(transfer.getId(), FAILED, SENT); + log.info("Resending faild transfer: " + stored); + send(transfer); + } + }, + () -> + { + send(transfer); + transfer.setState(SENT); + repository.store(transfer); + }); + } + + + private void send(Transfer transfer) + { + try + { + ProducerRecord record = + new ProducerRecord<>( + topic, + Long.toString(transfer.getId()), + mapper.writeValueAsString(transfer)); + + producer.send(record, (metadata, exception) -> + { + if (metadata != null) + { + log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset()); + repository.update(transfer.getId(), SENT, PENDING); + } + else + { + log.error("Could not send {}: {}", transfer, exception.getMessage()); + repository.update(transfer.getId(), SENT, FAILED); + } + }); + } + catch (JsonProcessingException e) + { + throw new RuntimeException("Could not convert " + transfer, e); + } + } + + public Optional get(Long id) + { + return repository.get(id); + } +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java new file mode 100644 index 0000000..5ef2094 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java @@ -0,0 +1,81 @@ +package de.juplo.kafka.payment.transfer.persistence; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.domain.TransferRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + + +@Component +@RequiredArgsConstructor +@Slf4j +public class InMemoryTransferRepository implements TransferRepository +{ + private final Map map = new HashMap<>(); + private final ObjectMapper mapper; + + + @Override + public synchronized void store(Transfer transfer) + { + Optional + .ofNullable(map.get(transfer.getId())) + .ifPresent( + json -> + { + throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer); + }); + + try + { + map.put(transfer.getId(), mapper.writeValueAsString(transfer)); + } + catch (JsonProcessingException e) + { + log.error("Could not convert Transfer.class: {}", transfer, e); + } + } + + @Override + public synchronized Optional get(Long id) + { + return + Optional + .ofNullable(map.get(id)) + .map(json -> { + try + { + return mapper.readValue(json, Transfer.class); + } + catch (JsonProcessingException e) + { + throw new RuntimeException("Could not convert JSON: " + json, e); + } + }); + } + + @Override + public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState) + { + Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id)); + + if (transfer.getState() != oldState) + throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState)); + + transfer.setState(newState); + store(transfer); + } + + @Override + public void remove(Long id) + { + map.remove(id); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties new file mode 100644 index 0000000..f22f985 --- /dev/null +++ b/src/main/resources/application.properties @@ -0,0 +1,3 @@ +management.endpoints.web.exposure.include=* + +logging.level.de.trion=info diff --git a/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java b/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java new file mode 100644 index 0000000..29b0d02 --- /dev/null +++ b/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.payment.transfer; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class TransferServiceApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/transfer/.dockerignore b/transfer/.dockerignore deleted file mode 100644 index 1ad9963..0000000 --- a/transfer/.dockerignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!target/*.jar diff --git a/transfer/.gitignore b/transfer/.gitignore deleted file mode 100644 index 549e00a..0000000 --- a/transfer/.gitignore +++ /dev/null @@ -1,33 +0,0 @@ -HELP.md -target/ -!.mvn/wrapper/maven-wrapper.jar -!**/src/main/**/target/ -!**/src/test/**/target/ - -### STS ### -.apt_generated -.classpath -.factorypath -.project -.settings -.springBeans -.sts4-cache - -### IntelliJ IDEA ### -.idea -*.iws -*.iml -*.ipr - -### NetBeans ### -/nbproject/private/ -/nbbuild/ -/dist/ -/nbdist/ -/.nb-gradle/ -build/ -!**/src/main/**/build/ -!**/src/test/**/build/ - -### VS Code ### -.vscode/ diff --git a/transfer/Dockerfile b/transfer/Dockerfile deleted file mode 100644 index cd6a95b..0000000 --- a/transfer/Dockerfile +++ /dev/null @@ -1,4 +0,0 @@ -FROM openjdk:8-jre-slim -COPY target/take-order-01-0-SNAPSHOT.jar /opt/ -EXPOSE 8080 -CMD ["java", "-jar", "/opt/take-order-01-0-SNAPSHOT.jar"] diff --git a/transfer/pom.xml b/transfer/pom.xml deleted file mode 100644 index 8a26995..0000000 --- a/transfer/pom.xml +++ /dev/null @@ -1,78 +0,0 @@ - - - 4.0.0 - - org.springframework.boot - spring-boot-starter-parent - 2.5.1 - - - de.juplo.kafka.payment - transfer - 1.0-SNAPSHOT - Transfer Service - An MVP for the Transfer Service - - 11 - 6.2.0 - 2.8.0 - - - - org.springframework.boot - spring-boot-starter-actuator - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-validation - - - - org.apache.kafka - kafka-clients - - - org.springframework.boot - spring-boot-devtools - runtime - true - - - jakarta.validation - jakarta.validation-api - - - org.projectlombok - lombok - true - - - org.springframework.boot - spring-boot-starter-test - test - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - - org.projectlombok - lombok - - - - - - - - diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java deleted file mode 100644 index 320b841..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ /dev/null @@ -1,51 +0,0 @@ -package de.juplo.kafka.payment.transfer; - - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.payment.transfer.domain.TransferRepository; -import de.juplo.kafka.payment.transfer.domain.TransferService; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; - -import java.util.Properties; -import java.util.UUID; - - -@SpringBootApplication -@EnableConfigurationProperties(TransferServiceProperties.class) -@Slf4j -public class TransferServiceApplication -{ - @Bean(destroyMethod = "close") - KafkaProducer producer(TransferServiceProperties properties) - { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - return new KafkaProducer<>(props); - } - - @Bean - TransferService transferService( - TransferRepository repository, - KafkaProducer producer, - ObjectMapper mapper, - TransferServiceProperties properties) - { - return new TransferService(repository, producer, mapper, properties.topic); - } - - - public static void main(String[] args) - { - SpringApplication.run(TransferServiceApplication.class, args); - } -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java deleted file mode 100644 index ccd22a3..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.payment.transfer; - - -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.transfer") -@Getter -@Setter -public class TransferServiceProperties -{ - String bootstrapServers = "localhost:9092"; - String topic = "transfers"; -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java deleted file mode 100644 index 2d83ad9..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java +++ /dev/null @@ -1,88 +0,0 @@ -package de.juplo.kafka.payment.transfer.controller; - - -import de.juplo.kafka.payment.transfer.domain.Transfer; -import de.juplo.kafka.payment.transfer.domain.TransferService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; -import org.springframework.validation.FieldError; -import org.springframework.web.bind.MethodArgumentNotValidException; -import org.springframework.web.bind.annotation.*; - -import javax.servlet.http.HttpServletRequest; -import javax.validation.Valid; -import java.net.URI; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - - -@RestController -@RequiredArgsConstructor -@Slf4j - public class TransferController -{ - public final static String PATH = "/transfers"; - - private final TransferService service; - - - @PostMapping( - path = PATH, - consumes = MediaType.APPLICATION_JSON_VALUE, - produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity transfer(@Valid @RequestBody TransferDTO transferDTO) - { - Transfer transfer = - Transfer - .builder() - .id(transferDTO.getId()) - .payer(transferDTO.getPayer()) - .payee(transferDTO.getPayee()) - .amount(transferDTO.getAmount()) - .build(); - - service.initiate(transfer); - - return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build(); - } - - @GetMapping( - path = PATH + "/{id}", - produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity get(@PathVariable Long id) - { - return - service - .get(id) - .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer))) - .orElse(ResponseEntity.notFound().build()); - } - - @ResponseStatus(HttpStatus.BAD_REQUEST) - @ExceptionHandler(MethodArgumentNotValidException.class) - public Map handleValidationExceptions( - HttpServletRequest request, - MethodArgumentNotValidException e) - { - Map errorAttributes = new HashMap<>(); - errorAttributes.put("status", HttpStatus.BAD_REQUEST.value()); - errorAttributes.put("error", HttpStatus.BAD_REQUEST.getReasonPhrase()); - errorAttributes.put("path", request.getRequestURI()); - errorAttributes.put("method", request.getMethod()); - errorAttributes.put("timestamp", new Date()); - Map errors = new HashMap<>(); - e.getBindingResult().getAllErrors().forEach((error) -> { - String fieldName = ((FieldError) error).getField(); - String errorMessage = error.getDefaultMessage(); - errors.put(fieldName, errorMessage); - }); - errorAttributes.put("errors", errors); - errorAttributes.put("message", "Validation failed: Invalid message format, error count: " + errors.size()); - return errorAttributes; - } -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java deleted file mode 100644 index ad4f57d..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java +++ /dev/null @@ -1,59 +0,0 @@ -package de.juplo.kafka.payment.transfer.controller; - -import de.juplo.kafka.payment.transfer.domain.Transfer; -import lombok.Builder; -import lombok.Data; - -import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; - - -/** - * Simple DTO used by the REST interface - */ -@Data -@Builder -public class TransferDTO -{ - @NotNull(message = "Cannot be null") - @Min(value = 1, message = "A valid transfer id must be a positive number") - private Long id; - @NotNull(message = "Cannot be null") - @Min(value = 1, message = "A valid bank account id must be a positive number") - private Long payer; - @NotNull(message = "Cannot be null") - @Min(value = 1, message = "A valid bank account id must be a positive number") - private Long payee; - @NotNull(message = "Cannot be null") - @Min(value = 1, message = "The amount of a transfer must be a positv value") - private Integer amount; - - private Transfer.State state; - - - public Transfer toTransfer() - { - return - Transfer - .builder() - .id(id) - .payer(payer) - .payee(payee) - .amount(amount) - .build(); - } - - - public static TransferDTO of(Transfer transfer) - { - return - TransferDTO - .builder() - .id(transfer.getId()) - .payer(transfer.getPayer()) - .payee(transfer.getPayee()) - .amount(transfer.getAmount()) - .state(transfer.getState()) - .build(); - } -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java deleted file mode 100644 index 5556a1b..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java +++ /dev/null @@ -1,29 +0,0 @@ -package de.juplo.kafka.payment.transfer.domain; - - -import lombok.Builder; -import lombok.Data; -import lombok.EqualsAndHashCode; - - -@Data -@Builder -@EqualsAndHashCode(exclude = "state") -public class Transfer -{ - public enum State - { - SENT, - FAILED, - PENDING, - APPROVED, - REJECTED - } - - private final long id; - private final long payer; - private final long payee; - private final int amount; - - private State state; -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java deleted file mode 100644 index 36d027c..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka.payment.transfer.domain; - -import java.util.Optional; - - -public interface TransferRepository -{ - void store(Transfer transfer); - - Optional get(Long id); - - void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException; - - void remove(Long id); -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java deleted file mode 100644 index d708826..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ /dev/null @@ -1,89 +0,0 @@ -package de.juplo.kafka.payment.transfer.domain; - - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.util.Optional; - -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; - - -@Slf4j -@RequiredArgsConstructor -public class TransferService -{ - private final TransferRepository repository; - private final KafkaProducer producer; - private final ObjectMapper mapper; - private final String topic; - - public synchronized void initiate(Transfer transfer) - { - repository - .get(transfer.getId()) - .ifPresentOrElse( - stored -> - { - if (!transfer.equals(stored)) - throw new IllegalArgumentException( - "Re-Initiation of transfer with different data: old=" + - stored + - ", new=" + - transfer); - - if (stored.getState() == FAILED) - { - repository.update(transfer.getId(), FAILED, SENT); - log.info("Resending faild transfer: " + stored); - send(transfer); - } - }, - () -> - { - send(transfer); - transfer.setState(SENT); - repository.store(transfer); - }); - } - - - private void send(Transfer transfer) - { - try - { - ProducerRecord record = - new ProducerRecord<>( - topic, - Long.toString(transfer.getId()), - mapper.writeValueAsString(transfer)); - - producer.send(record, (metadata, exception) -> - { - if (metadata != null) - { - log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset()); - repository.update(transfer.getId(), SENT, PENDING); - } - else - { - log.error("Could not send {}: {}", transfer, exception.getMessage()); - repository.update(transfer.getId(), SENT, FAILED); - } - }); - } - catch (JsonProcessingException e) - { - throw new RuntimeException("Could not convert " + transfer, e); - } - } - - public Optional get(Long id) - { - return repository.get(id); - } -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java deleted file mode 100644 index 5ef2094..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java +++ /dev/null @@ -1,81 +0,0 @@ -package de.juplo.kafka.payment.transfer.persistence; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.payment.transfer.domain.Transfer; -import de.juplo.kafka.payment.transfer.domain.TransferRepository; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - - -@Component -@RequiredArgsConstructor -@Slf4j -public class InMemoryTransferRepository implements TransferRepository -{ - private final Map map = new HashMap<>(); - private final ObjectMapper mapper; - - - @Override - public synchronized void store(Transfer transfer) - { - Optional - .ofNullable(map.get(transfer.getId())) - .ifPresent( - json -> - { - throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer); - }); - - try - { - map.put(transfer.getId(), mapper.writeValueAsString(transfer)); - } - catch (JsonProcessingException e) - { - log.error("Could not convert Transfer.class: {}", transfer, e); - } - } - - @Override - public synchronized Optional get(Long id) - { - return - Optional - .ofNullable(map.get(id)) - .map(json -> { - try - { - return mapper.readValue(json, Transfer.class); - } - catch (JsonProcessingException e) - { - throw new RuntimeException("Could not convert JSON: " + json, e); - } - }); - } - - @Override - public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState) - { - Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id)); - - if (transfer.getState() != oldState) - throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState)); - - transfer.setState(newState); - store(transfer); - } - - @Override - public void remove(Long id) - { - map.remove(id); - } -} diff --git a/transfer/src/main/resources/application.properties b/transfer/src/main/resources/application.properties deleted file mode 100644 index f22f985..0000000 --- a/transfer/src/main/resources/application.properties +++ /dev/null @@ -1,3 +0,0 @@ -management.endpoints.web.exposure.include=* - -logging.level.de.trion=info diff --git a/transfer/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java b/transfer/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java deleted file mode 100644 index 29b0d02..0000000 --- a/transfer/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.payment.transfer; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -@SpringBootTest -class TransferServiceApplicationTests { - - @Test - void contextLoads() { - } - -}