From: Kai Moritz Date: Sun, 13 Jun 2021 12:11:33 +0000 (+0200) Subject: WIP X-Git-Tag: wip-setup-initial~4 X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-setup;a=commitdiff_plain;h=0d818df81100873fc9ab174c6047d64e30b18386 WIP --- diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..b07e669 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "transfer"] + path = transfer + url = ../transfer diff --git a/README.sh b/README.sh index 1d5eb98..67df46e 100755 --- a/README.sh +++ b/README.sh @@ -4,34 +4,33 @@ if [ "$1" = "cleanup" ] then docker-compose down mvn clean - rm -rvf */src/main/java/de/trion/microservices/avro exit fi mvn package || exit 1 -if [ ! -e take-order/target/BUILD ] || [ "$(find take-order/target/classes/ -anewer take-order/target/BUILD | grep take-order/target/classes/de )" != "" ] +if [ ! -e transfer/target/BUILD ] || [ "$(find transfer/target/classes/ -anewer transfer/target/BUILD | grep transfer/target/classes/de )" != "" ] then - docker build -t trion/take-order-service:01 take-order - touch take-order/target/BUILD + echo "Rebuilding Docker-Image..." + docker-compose rm -svf transfer + mvn -f transfer/pom.xml docker:build + touch transfer/target/BUILD fi if [ "$1" = "build" ]; then exit; fi -trap 'kill $(jobs -p)' EXIT +docker-compose up -d zookeeper kafka -docker container start toolbox -docker-compose up -d zookeeper kafka schema-registry +while ! [[ $(docker-compose run --rm kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done -while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done +docker-compose run --rm kafka kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic transfers -kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders +docker-compose up -d transfer -docker-compose up -d take-order +docker-compose run --name transferlog --rm kafka kafka-console-consumer --bootstrap-server kafka:9093 --topic transfers & +while ! [[ $(http 0:8091/actuator/health 2> /dev/null | jq -r .status ) =~ "UP" ]]; do echo "Waiting for transfer..."; sleep 1; done -kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic orders & -while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for take-order..."; sleep 1; done +echo '{"id":1,"payer":1,"payee":2, "amount":10}' | http -v :8091/transfers +http :8091/transfers/1 +http -v :8091/transfers id=2 payer=2 payee=1 amount=5 +http :8091/transfers/2 -http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=5 -http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity= -http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=-5 -http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=-234 quantiyt=5 -http -v post 0:8091/orders Accept:*/* customerId=2 productId=234 quantity=5 +docker container stop transferlog diff --git a/docker-compose.yml b/docker-compose.yml index 6ac7dd9..eb1c720 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,9 +21,11 @@ services: - zookeeper transfer: - image: juplo/transfer-service:mvp + image: juplo/payment-service-demo--transfer:1.0.0 ports: - "8091:8080" + environment: + juplo.transfer.bootstrap-servers: kafka:9093 depends_on: - zookeeper - kafka diff --git a/pom.xml b/pom.xml index 43a1e8e..e54a79a 100644 --- a/pom.xml +++ b/pom.xml @@ -3,8 +3,8 @@ 4.0.0 de.trion.kafka.payment - payment-bom - Payment System Example + payment-setup + Payment System Example - Setup 1.0.0 pom diff --git a/transfer b/transfer new file mode 160000 index 0000000..66a9bc5 --- /dev/null +++ b/transfer @@ -0,0 +1 @@ +Subproject commit 66a9bc5f68d82002decea07872c1bfbb9d7fef5f diff --git a/transfer/.dockerignore b/transfer/.dockerignore deleted file mode 100644 index 1ad9963..0000000 --- a/transfer/.dockerignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!target/*.jar diff --git a/transfer/.gitignore b/transfer/.gitignore deleted file mode 100644 index 549e00a..0000000 --- a/transfer/.gitignore +++ /dev/null @@ -1,33 +0,0 @@ -HELP.md -target/ -!.mvn/wrapper/maven-wrapper.jar -!**/src/main/**/target/ -!**/src/test/**/target/ - -### STS ### -.apt_generated -.classpath -.factorypath -.project -.settings -.springBeans -.sts4-cache - -### IntelliJ IDEA ### -.idea -*.iws -*.iml -*.ipr - -### NetBeans ### -/nbproject/private/ -/nbbuild/ -/dist/ -/nbdist/ -/.nb-gradle/ -build/ -!**/src/main/**/build/ -!**/src/test/**/build/ - -### VS Code ### -.vscode/ diff --git a/transfer/Dockerfile b/transfer/Dockerfile deleted file mode 100644 index cd6a95b..0000000 --- a/transfer/Dockerfile +++ /dev/null @@ -1,4 +0,0 @@ -FROM openjdk:8-jre-slim -COPY target/take-order-01-0-SNAPSHOT.jar /opt/ -EXPOSE 8080 -CMD ["java", "-jar", "/opt/take-order-01-0-SNAPSHOT.jar"] diff --git a/transfer/pom.xml b/transfer/pom.xml deleted file mode 100644 index 8a26995..0000000 --- a/transfer/pom.xml +++ /dev/null @@ -1,78 +0,0 @@ - - - 4.0.0 - - org.springframework.boot - spring-boot-starter-parent - 2.5.1 - - - de.juplo.kafka.payment - transfer - 1.0-SNAPSHOT - Transfer Service - An MVP for the Transfer Service - - 11 - 6.2.0 - 2.8.0 - - - - org.springframework.boot - spring-boot-starter-actuator - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-validation - - - - org.apache.kafka - kafka-clients - - - org.springframework.boot - spring-boot-devtools - runtime - true - - - jakarta.validation - jakarta.validation-api - - - org.projectlombok - lombok - true - - - org.springframework.boot - spring-boot-starter-test - test - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - - org.projectlombok - lombok - - - - - - - - diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java deleted file mode 100644 index 320b841..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ /dev/null @@ -1,51 +0,0 @@ -package de.juplo.kafka.payment.transfer; - - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.payment.transfer.domain.TransferRepository; -import de.juplo.kafka.payment.transfer.domain.TransferService; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; - -import java.util.Properties; -import java.util.UUID; - - -@SpringBootApplication -@EnableConfigurationProperties(TransferServiceProperties.class) -@Slf4j -public class TransferServiceApplication -{ - @Bean(destroyMethod = "close") - KafkaProducer producer(TransferServiceProperties properties) - { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - return new KafkaProducer<>(props); - } - - @Bean - TransferService transferService( - TransferRepository repository, - KafkaProducer producer, - ObjectMapper mapper, - TransferServiceProperties properties) - { - return new TransferService(repository, producer, mapper, properties.topic); - } - - - public static void main(String[] args) - { - SpringApplication.run(TransferServiceApplication.class, args); - } -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java deleted file mode 100644 index ccd22a3..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.payment.transfer; - - -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.transfer") -@Getter -@Setter -public class TransferServiceProperties -{ - String bootstrapServers = "localhost:9092"; - String topic = "transfers"; -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java deleted file mode 100644 index b39322a..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java +++ /dev/null @@ -1,57 +0,0 @@ -package de.juplo.kafka.payment.transfer.controller; - - -import de.juplo.kafka.payment.transfer.domain.Transfer; -import de.juplo.kafka.payment.transfer.domain.TransferService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; - -import javax.validation.Valid; -import java.net.URI; - - -@RestController -@RequiredArgsConstructor -@Slf4j - public class TransferController -{ - public final static String PATH = "/transfers"; - - private final TransferService service; - - - @PostMapping( - path = PATH, - consumes = MediaType.APPLICATION_JSON_VALUE, - produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity transfer(@Valid @RequestBody TransferDTO transferDTO) - { - Transfer transfer = - Transfer - .builder() - .id(transferDTO.getId()) - .payer(transferDTO.getPayer()) - .payee(transferDTO.getPayee()) - .amount(transferDTO.getAmount()) - .build(); - - service.initiate(transfer); - - return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build(); - } - - @GetMapping( - path = PATH + "/{id}", - produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity get(@PathVariable Long id) - { - return - service - .get(id) - .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer))) - .orElse(ResponseEntity.notFound().build()); - } -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java deleted file mode 100644 index 1d2ffa8..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java +++ /dev/null @@ -1,59 +0,0 @@ -package de.juplo.kafka.payment.transfer.controller; - -import de.juplo.kafka.payment.transfer.domain.Transfer; -import lombok.Builder; -import lombok.Data; - -import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; - - -/** - * Simple DTO used by the REST interface - */ -@Data -@Builder -public class TransferDTO -{ - @NotNull(message = "Cannot be null") - @Min(value = 1, message = "A valid transfer id must be a positive number") - private Long id; - @NotNull(message = "Cannot be null") - @Min(value = 1, message = "A valid banc account id must be a positive number") - private Long payer; - @NotNull(message = "Cannot be null") - @Min(value = 1, message = "A valid banc account id must be a positive number") - private Long payee; - @NotNull(message = "Cannot be null") - @Min(value = 1, message = "Cannot transfer a non-positiv amount") - private Integer amount; - - private Transfer.State state; - - - public Transfer toTransfer() - { - return - Transfer - .builder() - .id(id) - .payer(payer) - .payee(payee) - .amount(amount) - .build(); - } - - - public static TransferDTO of(Transfer transfer) - { - return - TransferDTO - .builder() - .id(transfer.getId()) - .payer(transfer.getPayer()) - .payee(transfer.getPayee()) - .amount(transfer.getAmount()) - .state(transfer.getState()) - .build(); - } -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java deleted file mode 100644 index 5556a1b..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java +++ /dev/null @@ -1,29 +0,0 @@ -package de.juplo.kafka.payment.transfer.domain; - - -import lombok.Builder; -import lombok.Data; -import lombok.EqualsAndHashCode; - - -@Data -@Builder -@EqualsAndHashCode(exclude = "state") -public class Transfer -{ - public enum State - { - SENT, - FAILED, - PENDING, - APPROVED, - REJECTED - } - - private final long id; - private final long payer; - private final long payee; - private final int amount; - - private State state; -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java deleted file mode 100644 index 36d027c..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka.payment.transfer.domain; - -import java.util.Optional; - - -public interface TransferRepository -{ - void store(Transfer transfer); - - Optional get(Long id); - - void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException; - - void remove(Long id); -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java deleted file mode 100644 index d708826..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ /dev/null @@ -1,89 +0,0 @@ -package de.juplo.kafka.payment.transfer.domain; - - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.util.Optional; - -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; - - -@Slf4j -@RequiredArgsConstructor -public class TransferService -{ - private final TransferRepository repository; - private final KafkaProducer producer; - private final ObjectMapper mapper; - private final String topic; - - public synchronized void initiate(Transfer transfer) - { - repository - .get(transfer.getId()) - .ifPresentOrElse( - stored -> - { - if (!transfer.equals(stored)) - throw new IllegalArgumentException( - "Re-Initiation of transfer with different data: old=" + - stored + - ", new=" + - transfer); - - if (stored.getState() == FAILED) - { - repository.update(transfer.getId(), FAILED, SENT); - log.info("Resending faild transfer: " + stored); - send(transfer); - } - }, - () -> - { - send(transfer); - transfer.setState(SENT); - repository.store(transfer); - }); - } - - - private void send(Transfer transfer) - { - try - { - ProducerRecord record = - new ProducerRecord<>( - topic, - Long.toString(transfer.getId()), - mapper.writeValueAsString(transfer)); - - producer.send(record, (metadata, exception) -> - { - if (metadata != null) - { - log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset()); - repository.update(transfer.getId(), SENT, PENDING); - } - else - { - log.error("Could not send {}: {}", transfer, exception.getMessage()); - repository.update(transfer.getId(), SENT, FAILED); - } - }); - } - catch (JsonProcessingException e) - { - throw new RuntimeException("Could not convert " + transfer, e); - } - } - - public Optional get(Long id) - { - return repository.get(id); - } -} diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java deleted file mode 100644 index 5ef2094..0000000 --- a/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java +++ /dev/null @@ -1,81 +0,0 @@ -package de.juplo.kafka.payment.transfer.persistence; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.payment.transfer.domain.Transfer; -import de.juplo.kafka.payment.transfer.domain.TransferRepository; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - - -@Component -@RequiredArgsConstructor -@Slf4j -public class InMemoryTransferRepository implements TransferRepository -{ - private final Map map = new HashMap<>(); - private final ObjectMapper mapper; - - - @Override - public synchronized void store(Transfer transfer) - { - Optional - .ofNullable(map.get(transfer.getId())) - .ifPresent( - json -> - { - throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer); - }); - - try - { - map.put(transfer.getId(), mapper.writeValueAsString(transfer)); - } - catch (JsonProcessingException e) - { - log.error("Could not convert Transfer.class: {}", transfer, e); - } - } - - @Override - public synchronized Optional get(Long id) - { - return - Optional - .ofNullable(map.get(id)) - .map(json -> { - try - { - return mapper.readValue(json, Transfer.class); - } - catch (JsonProcessingException e) - { - throw new RuntimeException("Could not convert JSON: " + json, e); - } - }); - } - - @Override - public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState) - { - Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id)); - - if (transfer.getState() != oldState) - throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState)); - - transfer.setState(newState); - store(transfer); - } - - @Override - public void remove(Long id) - { - map.remove(id); - } -} diff --git a/transfer/src/main/resources/application.properties b/transfer/src/main/resources/application.properties deleted file mode 100644 index f22f985..0000000 --- a/transfer/src/main/resources/application.properties +++ /dev/null @@ -1,3 +0,0 @@ -management.endpoints.web.exposure.include=* - -logging.level.de.trion=info diff --git a/transfer/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java b/transfer/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java deleted file mode 100644 index 29b0d02..0000000 --- a/transfer/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.payment.transfer; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -@SpringBootTest -class TransferServiceApplicationTests { - - @Test - void contextLoads() { - } - -}