--- /dev/null
+*
+!target/*.jar
--- /dev/null
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
--- /dev/null
+FROM openjdk:8-jre-slim
+COPY target/take-order-01-0-SNAPSHOT.jar /opt/
+EXPOSE 8080
+CMD ["java", "-jar", "/opt/take-order-01-0-SNAPSHOT.jar"]
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.5.1</version>
+ <relativePath/> <!-- lookup parent from repository -->
+ </parent>
+ <groupId>de.juplo.kafka.payment</groupId>
+ <artifactId>transfer</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <name>Transfer Service</name>
+ <description>An MVP for the Transfer Service</description>
+ <properties>
+ <java.version>11</java.version>
+ <confluent.version>6.2.0</confluent.version>
+ <kafka.version>2.8.0</kafka.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-validation</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-devtools</artifactId>
+ <scope>runtime</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.validation</groupId>
+ <artifactId>jakarta.validation-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.juplo.kafka.payment.transfer;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.TransferRepository;
+import de.juplo.kafka.payment.transfer.domain.TransferService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+
+import java.util.Properties;
+import java.util.UUID;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(TransferServiceProperties.class)
+@Slf4j
+public class TransferServiceApplication
+{
+ @Bean(destroyMethod = "close")
+ KafkaProducer<String, String> producer(TransferServiceProperties properties)
+ {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ return new KafkaProducer<>(props);
+ }
+
+ @Bean
+ TransferService transferService(
+ TransferRepository repository,
+ KafkaProducer<String, String> producer,
+ ObjectMapper mapper,
+ TransferServiceProperties properties)
+ {
+ return new TransferService(repository, producer, mapper, properties.topic);
+ }
+
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(TransferServiceApplication.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.transfer")
+@Getter
+@Setter
+public class TransferServiceProperties
+{
+ String bootstrapServers = "localhost:9092";
+ String topic = "transfers";
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.controller;
+
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.domain.TransferService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.validation.FieldError;
+import org.springframework.web.bind.MethodArgumentNotValidException;
+import org.springframework.web.bind.annotation.*;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.validation.Valid;
+import java.net.URI;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+
+@RestController
+@RequiredArgsConstructor
+@Slf4j
+ public class TransferController
+{
+ public final static String PATH = "/transfers";
+
+ private final TransferService service;
+
+
+ @PostMapping(
+ path = PATH,
+ consumes = MediaType.APPLICATION_JSON_VALUE,
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity<?> transfer(@Valid @RequestBody TransferDTO transferDTO)
+ {
+ Transfer transfer =
+ Transfer
+ .builder()
+ .id(transferDTO.getId())
+ .payer(transferDTO.getPayer())
+ .payee(transferDTO.getPayee())
+ .amount(transferDTO.getAmount())
+ .build();
+
+ service.initiate(transfer);
+
+ return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build();
+ }
+
+ @GetMapping(
+ path = PATH + "/{id}",
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity<TransferDTO> get(@PathVariable Long id)
+ {
+ return
+ service
+ .get(id)
+ .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer)))
+ .orElse(ResponseEntity.notFound().build());
+ }
+
+ @ResponseStatus(HttpStatus.BAD_REQUEST)
+ @ExceptionHandler(MethodArgumentNotValidException.class)
+ public Map<String, Object> handleValidationExceptions(
+ HttpServletRequest request,
+ MethodArgumentNotValidException e)
+ {
+ Map<String, Object> errorAttributes = new HashMap<>();
+ errorAttributes.put("status", HttpStatus.BAD_REQUEST.value());
+ errorAttributes.put("error", HttpStatus.BAD_REQUEST.getReasonPhrase());
+ errorAttributes.put("path", request.getRequestURI());
+ errorAttributes.put("method", request.getMethod());
+ errorAttributes.put("timestamp", new Date());
+ Map<String, String> errors = new HashMap<>();
+ e.getBindingResult().getAllErrors().forEach((error) -> {
+ String fieldName = ((FieldError) error).getField();
+ String errorMessage = error.getDefaultMessage();
+ errors.put(fieldName, errorMessage);
+ });
+ errorAttributes.put("errors", errors);
+ errorAttributes.put("message", "Validation failed: Invalid message format, error count: " + errors.size());
+ return errorAttributes;
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.controller;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import lombok.Builder;
+import lombok.Data;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+
+/**
+ * Simple DTO used by the REST interface
+ */
+@Data
+@Builder
+public class TransferDTO
+{
+ @NotNull(message = "Cannot be null")
+ @Min(value = 1, message = "A valid transfer id must be a positive number")
+ private Long id;
+ @NotNull(message = "Cannot be null")
+ @Min(value = 1, message = "A valid bank account id must be a positive number")
+ private Long payer;
+ @NotNull(message = "Cannot be null")
+ @Min(value = 1, message = "A valid bank account id must be a positive number")
+ private Long payee;
+ @NotNull(message = "Cannot be null")
+ @Min(value = 1, message = "The amount of a transfer must be a positv value")
+ private Integer amount;
+
+ private Transfer.State state;
+
+
+ public Transfer toTransfer()
+ {
+ return
+ Transfer
+ .builder()
+ .id(id)
+ .payer(payer)
+ .payee(payee)
+ .amount(amount)
+ .build();
+ }
+
+
+ public static TransferDTO of(Transfer transfer)
+ {
+ return
+ TransferDTO
+ .builder()
+ .id(transfer.getId())
+ .payer(transfer.getPayer())
+ .payee(transfer.getPayee())
+ .amount(transfer.getAmount())
+ .state(transfer.getState())
+ .build();
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.domain;
+
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@Builder
+@EqualsAndHashCode(exclude = "state")
+public class Transfer
+{
+ public enum State
+ {
+ SENT,
+ FAILED,
+ PENDING,
+ APPROVED,
+ REJECTED
+ }
+
+ private final long id;
+ private final long payer;
+ private final long payee;
+ private final int amount;
+
+ private State state;
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.domain;
+
+import java.util.Optional;
+
+
+public interface TransferRepository
+{
+ void store(Transfer transfer);
+
+ Optional<Transfer> get(Long id);
+
+ void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException;
+
+ void remove(Long id);
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.domain;
+
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Optional;
+
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
+
+
+@Slf4j
+@RequiredArgsConstructor
+public class TransferService
+{
+ private final TransferRepository repository;
+ private final KafkaProducer<String, String> producer;
+ private final ObjectMapper mapper;
+ private final String topic;
+
+ public synchronized void initiate(Transfer transfer)
+ {
+ repository
+ .get(transfer.getId())
+ .ifPresentOrElse(
+ stored ->
+ {
+ if (!transfer.equals(stored))
+ throw new IllegalArgumentException(
+ "Re-Initiation of transfer with different data: old=" +
+ stored +
+ ", new=" +
+ transfer);
+
+ if (stored.getState() == FAILED)
+ {
+ repository.update(transfer.getId(), FAILED, SENT);
+ log.info("Resending faild transfer: " + stored);
+ send(transfer);
+ }
+ },
+ () ->
+ {
+ send(transfer);
+ transfer.setState(SENT);
+ repository.store(transfer);
+ });
+ }
+
+
+ private void send(Transfer transfer)
+ {
+ try
+ {
+ ProducerRecord<String, String> record =
+ new ProducerRecord<>(
+ topic,
+ Long.toString(transfer.getId()),
+ mapper.writeValueAsString(transfer));
+
+ producer.send(record, (metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
+ repository.update(transfer.getId(), SENT, PENDING);
+ }
+ else
+ {
+ log.error("Could not send {}: {}", transfer, exception.getMessage());
+ repository.update(transfer.getId(), SENT, FAILED);
+ }
+ });
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new RuntimeException("Could not convert " + transfer, e);
+ }
+ }
+
+ public Optional<Transfer> get(Long id)
+ {
+ return repository.get(id);
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.persistence;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.domain.TransferRepository;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+
+@Component
+@RequiredArgsConstructor
+@Slf4j
+public class InMemoryTransferRepository implements TransferRepository
+{
+ private final Map<Long, String> map = new HashMap<>();
+ private final ObjectMapper mapper;
+
+
+ @Override
+ public synchronized void store(Transfer transfer)
+ {
+ Optional
+ .ofNullable(map.get(transfer.getId()))
+ .ifPresent(
+ json ->
+ {
+ throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
+ });
+
+ try
+ {
+ map.put(transfer.getId(), mapper.writeValueAsString(transfer));
+ }
+ catch (JsonProcessingException e)
+ {
+ log.error("Could not convert Transfer.class: {}", transfer, e);
+ }
+ }
+
+ @Override
+ public synchronized Optional<Transfer> get(Long id)
+ {
+ return
+ Optional
+ .ofNullable(map.get(id))
+ .map(json -> {
+ try
+ {
+ return mapper.readValue(json, Transfer.class);
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new RuntimeException("Could not convert JSON: " + json, e);
+ }
+ });
+ }
+
+ @Override
+ public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
+ {
+ Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
+
+ if (transfer.getState() != oldState)
+ throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
+
+ transfer.setState(newState);
+ store(transfer);
+ }
+
+ @Override
+ public void remove(Long id)
+ {
+ map.remove(id);
+ }
+}
--- /dev/null
+management.endpoints.web.exposure.include=*
+
+logging.level.de.trion=info
--- /dev/null
+package de.juplo.kafka.payment.transfer;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class TransferServiceApplicationTests {
+
+ @Test
+ void contextLoads() {
+ }
+
+}
+++ /dev/null
-*
-!target/*.jar
+++ /dev/null
-HELP.md
-target/
-!.mvn/wrapper/maven-wrapper.jar
-!**/src/main/**/target/
-!**/src/test/**/target/
-
-### STS ###
-.apt_generated
-.classpath
-.factorypath
-.project
-.settings
-.springBeans
-.sts4-cache
-
-### IntelliJ IDEA ###
-.idea
-*.iws
-*.iml
-*.ipr
-
-### NetBeans ###
-/nbproject/private/
-/nbbuild/
-/dist/
-/nbdist/
-/.nb-gradle/
-build/
-!**/src/main/**/build/
-!**/src/test/**/build/
-
-### VS Code ###
-.vscode/
+++ /dev/null
-FROM openjdk:8-jre-slim
-COPY target/take-order-01-0-SNAPSHOT.jar /opt/
-EXPOSE 8080
-CMD ["java", "-jar", "/opt/take-order-01-0-SNAPSHOT.jar"]
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.5.1</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>de.juplo.kafka.payment</groupId>
- <artifactId>transfer</artifactId>
- <version>1.0-SNAPSHOT</version>
- <name>Transfer Service</name>
- <description>An MVP for the Transfer Service</description>
- <properties>
- <java.version>11</java.version>
- <confluent.version>6.2.0</confluent.version>
- <kafka.version>2.8.0</kafka.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-validation</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>jakarta.validation</groupId>
- <artifactId>jakarta.validation-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
+++ /dev/null
-package de.juplo.kafka.payment.transfer;
-
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.payment.transfer.domain.TransferRepository;
-import de.juplo.kafka.payment.transfer.domain.TransferService;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-
-import java.util.Properties;
-import java.util.UUID;
-
-
-@SpringBootApplication
-@EnableConfigurationProperties(TransferServiceProperties.class)
-@Slf4j
-public class TransferServiceApplication
-{
- @Bean(destroyMethod = "close")
- KafkaProducer<String, String> producer(TransferServiceProperties properties)
- {
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- return new KafkaProducer<>(props);
- }
-
- @Bean
- TransferService transferService(
- TransferRepository repository,
- KafkaProducer<String, String> producer,
- ObjectMapper mapper,
- TransferServiceProperties properties)
- {
- return new TransferService(repository, producer, mapper, properties.topic);
- }
-
-
- public static void main(String[] args)
- {
- SpringApplication.run(TransferServiceApplication.class, args);
- }
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.transfer")
-@Getter
-@Setter
-public class TransferServiceProperties
-{
- String bootstrapServers = "localhost:9092";
- String topic = "transfers";
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.controller;
-
-
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-import de.juplo.kafka.payment.transfer.domain.TransferService;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
-import org.springframework.validation.FieldError;
-import org.springframework.web.bind.MethodArgumentNotValidException;
-import org.springframework.web.bind.annotation.*;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.validation.Valid;
-import java.net.URI;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-@RestController
-@RequiredArgsConstructor
-@Slf4j
- public class TransferController
-{
- public final static String PATH = "/transfers";
-
- private final TransferService service;
-
-
- @PostMapping(
- path = PATH,
- consumes = MediaType.APPLICATION_JSON_VALUE,
- produces = MediaType.APPLICATION_JSON_VALUE)
- public ResponseEntity<?> transfer(@Valid @RequestBody TransferDTO transferDTO)
- {
- Transfer transfer =
- Transfer
- .builder()
- .id(transferDTO.getId())
- .payer(transferDTO.getPayer())
- .payee(transferDTO.getPayee())
- .amount(transferDTO.getAmount())
- .build();
-
- service.initiate(transfer);
-
- return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build();
- }
-
- @GetMapping(
- path = PATH + "/{id}",
- produces = MediaType.APPLICATION_JSON_VALUE)
- public ResponseEntity<TransferDTO> get(@PathVariable Long id)
- {
- return
- service
- .get(id)
- .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer)))
- .orElse(ResponseEntity.notFound().build());
- }
-
- @ResponseStatus(HttpStatus.BAD_REQUEST)
- @ExceptionHandler(MethodArgumentNotValidException.class)
- public Map<String, Object> handleValidationExceptions(
- HttpServletRequest request,
- MethodArgumentNotValidException e)
- {
- Map<String, Object> errorAttributes = new HashMap<>();
- errorAttributes.put("status", HttpStatus.BAD_REQUEST.value());
- errorAttributes.put("error", HttpStatus.BAD_REQUEST.getReasonPhrase());
- errorAttributes.put("path", request.getRequestURI());
- errorAttributes.put("method", request.getMethod());
- errorAttributes.put("timestamp", new Date());
- Map<String, String> errors = new HashMap<>();
- e.getBindingResult().getAllErrors().forEach((error) -> {
- String fieldName = ((FieldError) error).getField();
- String errorMessage = error.getDefaultMessage();
- errors.put(fieldName, errorMessage);
- });
- errorAttributes.put("errors", errors);
- errorAttributes.put("message", "Validation failed: Invalid message format, error count: " + errors.size());
- return errorAttributes;
- }
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.controller;
-
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-import lombok.Builder;
-import lombok.Data;
-
-import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
-
-
-/**
- * Simple DTO used by the REST interface
- */
-@Data
-@Builder
-public class TransferDTO
-{
- @NotNull(message = "Cannot be null")
- @Min(value = 1, message = "A valid transfer id must be a positive number")
- private Long id;
- @NotNull(message = "Cannot be null")
- @Min(value = 1, message = "A valid bank account id must be a positive number")
- private Long payer;
- @NotNull(message = "Cannot be null")
- @Min(value = 1, message = "A valid bank account id must be a positive number")
- private Long payee;
- @NotNull(message = "Cannot be null")
- @Min(value = 1, message = "The amount of a transfer must be a positv value")
- private Integer amount;
-
- private Transfer.State state;
-
-
- public Transfer toTransfer()
- {
- return
- Transfer
- .builder()
- .id(id)
- .payer(payer)
- .payee(payee)
- .amount(amount)
- .build();
- }
-
-
- public static TransferDTO of(Transfer transfer)
- {
- return
- TransferDTO
- .builder()
- .id(transfer.getId())
- .payer(transfer.getPayer())
- .payee(transfer.getPayee())
- .amount(transfer.getAmount())
- .state(transfer.getState())
- .build();
- }
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.domain;
-
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-
-@Data
-@Builder
-@EqualsAndHashCode(exclude = "state")
-public class Transfer
-{
- public enum State
- {
- SENT,
- FAILED,
- PENDING,
- APPROVED,
- REJECTED
- }
-
- private final long id;
- private final long payer;
- private final long payee;
- private final int amount;
-
- private State state;
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.domain;
-
-import java.util.Optional;
-
-
-public interface TransferRepository
-{
- void store(Transfer transfer);
-
- Optional<Transfer> get(Long id);
-
- void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException;
-
- void remove(Long id);
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.domain;
-
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import java.util.Optional;
-
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
-
-
-@Slf4j
-@RequiredArgsConstructor
-public class TransferService
-{
- private final TransferRepository repository;
- private final KafkaProducer<String, String> producer;
- private final ObjectMapper mapper;
- private final String topic;
-
- public synchronized void initiate(Transfer transfer)
- {
- repository
- .get(transfer.getId())
- .ifPresentOrElse(
- stored ->
- {
- if (!transfer.equals(stored))
- throw new IllegalArgumentException(
- "Re-Initiation of transfer with different data: old=" +
- stored +
- ", new=" +
- transfer);
-
- if (stored.getState() == FAILED)
- {
- repository.update(transfer.getId(), FAILED, SENT);
- log.info("Resending faild transfer: " + stored);
- send(transfer);
- }
- },
- () ->
- {
- send(transfer);
- transfer.setState(SENT);
- repository.store(transfer);
- });
- }
-
-
- private void send(Transfer transfer)
- {
- try
- {
- ProducerRecord<String, String> record =
- new ProducerRecord<>(
- topic,
- Long.toString(transfer.getId()),
- mapper.writeValueAsString(transfer));
-
- producer.send(record, (metadata, exception) ->
- {
- if (metadata != null)
- {
- log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
- repository.update(transfer.getId(), SENT, PENDING);
- }
- else
- {
- log.error("Could not send {}: {}", transfer, exception.getMessage());
- repository.update(transfer.getId(), SENT, FAILED);
- }
- });
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException("Could not convert " + transfer, e);
- }
- }
-
- public Optional<Transfer> get(Long id)
- {
- return repository.get(id);
- }
-}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.persistence;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-import de.juplo.kafka.payment.transfer.domain.TransferRepository;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-@Component
-@RequiredArgsConstructor
-@Slf4j
-public class InMemoryTransferRepository implements TransferRepository
-{
- private final Map<Long, String> map = new HashMap<>();
- private final ObjectMapper mapper;
-
-
- @Override
- public synchronized void store(Transfer transfer)
- {
- Optional
- .ofNullable(map.get(transfer.getId()))
- .ifPresent(
- json ->
- {
- throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
- });
-
- try
- {
- map.put(transfer.getId(), mapper.writeValueAsString(transfer));
- }
- catch (JsonProcessingException e)
- {
- log.error("Could not convert Transfer.class: {}", transfer, e);
- }
- }
-
- @Override
- public synchronized Optional<Transfer> get(Long id)
- {
- return
- Optional
- .ofNullable(map.get(id))
- .map(json -> {
- try
- {
- return mapper.readValue(json, Transfer.class);
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException("Could not convert JSON: " + json, e);
- }
- });
- }
-
- @Override
- public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
- {
- Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
-
- if (transfer.getState() != oldState)
- throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
-
- transfer.setState(newState);
- store(transfer);
- }
-
- @Override
- public void remove(Long id)
- {
- map.remove(id);
- }
-}
+++ /dev/null
-management.endpoints.web.exposure.include=*
-
-logging.level.de.trion=info
+++ /dev/null
-package de.juplo.kafka.payment.transfer;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
-
-@SpringBootTest
-class TransferServiceApplicationTests {
-
- @Test
- void contextLoads() {
- }
-
-}