--- /dev/null
+[submodule "transfer"]
+ path = transfer
+ url = ../transfer
then
docker-compose down
mvn clean
- rm -rvf */src/main/java/de/trion/microservices/avro
exit
fi
mvn package || exit 1
-if [ ! -e take-order/target/BUILD ] || [ "$(find take-order/target/classes/ -anewer take-order/target/BUILD | grep take-order/target/classes/de )" != "" ]
+if [ ! -e transfer/target/BUILD ] || [ "$(find transfer/target/classes/ -anewer transfer/target/BUILD | grep transfer/target/classes/de )" != "" ]
then
- docker build -t trion/take-order-service:01 take-order
- touch take-order/target/BUILD
+ echo "Rebuilding Docker-Image..."
+ docker-compose rm -svf transfer
+ mvn -f transfer/pom.xml docker:build
+ touch transfer/target/BUILD
fi
if [ "$1" = "build" ]; then exit; fi
-trap 'kill $(jobs -p)' EXIT
+docker-compose up -d zookeeper kafka
-docker container start toolbox
-docker-compose up -d zookeeper kafka schema-registry
+while ! [[ $(docker-compose run --rm kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done
-while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done
+docker-compose run --rm kafka kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic transfers
-kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders
+docker-compose up -d transfer
-docker-compose up -d take-order
+docker-compose run --name transferlog --rm kafka kafka-console-consumer --bootstrap-server kafka:9093 --topic transfers &
+while ! [[ $(http 0:8091/actuator/health 2> /dev/null | jq -r .status ) =~ "UP" ]]; do echo "Waiting for transfer..."; sleep 1; done
-kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic orders &
-while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for take-order..."; sleep 1; done
+echo '{"id":1,"payer":1,"payee":2, "amount":10}' | http -v :8091/transfers
+http :8091/transfers/1
+http -v :8091/transfers id=2 payer=2 payee=1 amount=5
+http :8091/transfers/2
-http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=5
-http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=
-http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=-5
-http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=-234 quantiyt=5
-http -v post 0:8091/orders Accept:*/* customerId=2 productId=234 quantity=5
+docker container stop transferlog
- zookeeper
transfer:
- image: juplo/transfer-service:mvp
+ image: juplo/payment-service-demo--transfer:1.0.0
ports:
- "8091:8080"
+ environment:
+ juplo.transfer.bootstrap-servers: kafka:9093
depends_on:
- zookeeper
- kafka
<modelVersion>4.0.0</modelVersion>
<groupId>de.trion.kafka.payment</groupId>
- <artifactId>payment-bom</artifactId>
- <name>Payment System Example</name>
+ <artifactId>payment-setup</artifactId>
+ <name>Payment System Example - Setup</name>
<version>1.0.0</version>
<packaging>pom</packaging>
--- /dev/null
+Subproject commit 66a9bc5f68d82002decea07872c1bfbb9d7fef5f
+++ /dev/null
-*
-!target/*.jar
+++ /dev/null
-HELP.md
-target/
-!.mvn/wrapper/maven-wrapper.jar
-!**/src/main/**/target/
-!**/src/test/**/target/
-
-### STS ###
-.apt_generated
-.classpath
-.factorypath
-.project
-.settings
-.springBeans
-.sts4-cache
-
-### IntelliJ IDEA ###
-.idea
-*.iws
-*.iml
-*.ipr
-
-### NetBeans ###
-/nbproject/private/
-/nbbuild/
-/dist/
-/nbdist/
-/.nb-gradle/
-build/
-!**/src/main/**/build/
-!**/src/test/**/build/
-
-### VS Code ###
-.vscode/
+++ /dev/null
-FROM openjdk:8-jre-slim
-COPY target/take-order-01-0-SNAPSHOT.jar /opt/
-EXPOSE 8080
-CMD ["java", "-jar", "/opt/take-order-01-0-SNAPSHOT.jar"]
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.5.1</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>de.juplo.kafka.payment</groupId>
- <artifactId>transfer</artifactId>
- <version>1.0-SNAPSHOT</version>
- <name>Transfer Service</name>
- <description>An MVP for the Transfer Service</description>
- <properties>
- <java.version>11</java.version>
- <confluent.version>6.2.0</confluent.version>
- <kafka.version>2.8.0</kafka.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-validation</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>jakarta.validation</groupId>
- <artifactId>jakarta.validation-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
+++ /dev/null
-package de.juplo.kafka.payment.transfer;
-
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.payment.transfer.domain.TransferRepository;
-import de.juplo.kafka.payment.transfer.domain.TransferService;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-
-import java.util.Properties;
-import java.util.UUID;
-
-
-@SpringBootApplication
-@EnableConfigurationProperties(TransferServiceProperties.class)
-@Slf4j
-public class TransferServiceApplication
-{
- @Bean(destroyMethod = "close")
- KafkaProducer<String, String> producer(TransferServiceProperties properties)
- {
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- return new KafkaProducer<>(props);
- }
-
- @Bean
- TransferService transferService(
- TransferRepository repository,
- KafkaProducer<String, String> producer,
- ObjectMapper mapper,
- TransferServiceProperties properties)
- {
- return new TransferService(repository, producer, mapper, properties.topic);
- }
-
-
- public static void main(String[] args)
- {
- SpringApplication.run(TransferServiceApplication.class, args);
- }
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.transfer")
-@Getter
-@Setter
-public class TransferServiceProperties
-{
- String bootstrapServers = "localhost:9092";
- String topic = "transfers";
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.controller;
-
-
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-import de.juplo.kafka.payment.transfer.domain.TransferService;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.*;
-
-import javax.validation.Valid;
-import java.net.URI;
-
-
-@RestController
-@RequiredArgsConstructor
-@Slf4j
- public class TransferController
-{
- public final static String PATH = "/transfers";
-
- private final TransferService service;
-
-
- @PostMapping(
- path = PATH,
- consumes = MediaType.APPLICATION_JSON_VALUE,
- produces = MediaType.APPLICATION_JSON_VALUE)
- public ResponseEntity<?> transfer(@Valid @RequestBody TransferDTO transferDTO)
- {
- Transfer transfer =
- Transfer
- .builder()
- .id(transferDTO.getId())
- .payer(transferDTO.getPayer())
- .payee(transferDTO.getPayee())
- .amount(transferDTO.getAmount())
- .build();
-
- service.initiate(transfer);
-
- return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build();
- }
-
- @GetMapping(
- path = PATH + "/{id}",
- produces = MediaType.APPLICATION_JSON_VALUE)
- public ResponseEntity<TransferDTO> get(@PathVariable Long id)
- {
- return
- service
- .get(id)
- .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer)))
- .orElse(ResponseEntity.notFound().build());
- }
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.controller;
-
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-import lombok.Builder;
-import lombok.Data;
-
-import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
-
-
-/**
- * Simple DTO used by the REST interface
- */
-@Data
-@Builder
-public class TransferDTO
-{
- @NotNull(message = "Cannot be null")
- @Min(value = 1, message = "A valid transfer id must be a positive number")
- private Long id;
- @NotNull(message = "Cannot be null")
- @Min(value = 1, message = "A valid banc account id must be a positive number")
- private Long payer;
- @NotNull(message = "Cannot be null")
- @Min(value = 1, message = "A valid banc account id must be a positive number")
- private Long payee;
- @NotNull(message = "Cannot be null")
- @Min(value = 1, message = "Cannot transfer a non-positiv amount")
- private Integer amount;
-
- private Transfer.State state;
-
-
- public Transfer toTransfer()
- {
- return
- Transfer
- .builder()
- .id(id)
- .payer(payer)
- .payee(payee)
- .amount(amount)
- .build();
- }
-
-
- public static TransferDTO of(Transfer transfer)
- {
- return
- TransferDTO
- .builder()
- .id(transfer.getId())
- .payer(transfer.getPayer())
- .payee(transfer.getPayee())
- .amount(transfer.getAmount())
- .state(transfer.getState())
- .build();
- }
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.domain;
-
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-
-@Data
-@Builder
-@EqualsAndHashCode(exclude = "state")
-public class Transfer
-{
- public enum State
- {
- SENT,
- FAILED,
- PENDING,
- APPROVED,
- REJECTED
- }
-
- private final long id;
- private final long payer;
- private final long payee;
- private final int amount;
-
- private State state;
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.domain;
-
-import java.util.Optional;
-
-
-public interface TransferRepository
-{
- void store(Transfer transfer);
-
- Optional<Transfer> get(Long id);
-
- void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException;
-
- void remove(Long id);
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.domain;
-
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import java.util.Optional;
-
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
-
-
-@Slf4j
-@RequiredArgsConstructor
-public class TransferService
-{
- private final TransferRepository repository;
- private final KafkaProducer<String, String> producer;
- private final ObjectMapper mapper;
- private final String topic;
-
- public synchronized void initiate(Transfer transfer)
- {
- repository
- .get(transfer.getId())
- .ifPresentOrElse(
- stored ->
- {
- if (!transfer.equals(stored))
- throw new IllegalArgumentException(
- "Re-Initiation of transfer with different data: old=" +
- stored +
- ", new=" +
- transfer);
-
- if (stored.getState() == FAILED)
- {
- repository.update(transfer.getId(), FAILED, SENT);
- log.info("Resending faild transfer: " + stored);
- send(transfer);
- }
- },
- () ->
- {
- send(transfer);
- transfer.setState(SENT);
- repository.store(transfer);
- });
- }
-
-
- private void send(Transfer transfer)
- {
- try
- {
- ProducerRecord<String, String> record =
- new ProducerRecord<>(
- topic,
- Long.toString(transfer.getId()),
- mapper.writeValueAsString(transfer));
-
- producer.send(record, (metadata, exception) ->
- {
- if (metadata != null)
- {
- log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
- repository.update(transfer.getId(), SENT, PENDING);
- }
- else
- {
- log.error("Could not send {}: {}", transfer, exception.getMessage());
- repository.update(transfer.getId(), SENT, FAILED);
- }
- });
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException("Could not convert " + transfer, e);
- }
- }
-
- public Optional<Transfer> get(Long id)
- {
- return repository.get(id);
- }
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.persistence;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-import de.juplo.kafka.payment.transfer.domain.TransferRepository;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-@Component
-@RequiredArgsConstructor
-@Slf4j
-public class InMemoryTransferRepository implements TransferRepository
-{
- private final Map<Long, String> map = new HashMap<>();
- private final ObjectMapper mapper;
-
-
- @Override
- public synchronized void store(Transfer transfer)
- {
- Optional
- .ofNullable(map.get(transfer.getId()))
- .ifPresent(
- json ->
- {
- throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
- });
-
- try
- {
- map.put(transfer.getId(), mapper.writeValueAsString(transfer));
- }
- catch (JsonProcessingException e)
- {
- log.error("Could not convert Transfer.class: {}", transfer, e);
- }
- }
-
- @Override
- public synchronized Optional<Transfer> get(Long id)
- {
- return
- Optional
- .ofNullable(map.get(id))
- .map(json -> {
- try
- {
- return mapper.readValue(json, Transfer.class);
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException("Could not convert JSON: " + json, e);
- }
- });
- }
-
- @Override
- public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
- {
- Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
-
- if (transfer.getState() != oldState)
- throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
-
- transfer.setState(newState);
- store(transfer);
- }
-
- @Override
- public void remove(Long id)
- {
- map.remove(id);
- }
-}
+++ /dev/null
-management.endpoints.web.exposure.include=*
-
-logging.level.de.trion=info
+++ /dev/null
-package de.juplo.kafka.payment.transfer;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
-
-@SpringBootTest
-class TransferServiceApplicationTests {
-
- @Test
- void contextLoads() {
- }
-
-}