--- /dev/null
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.5.1</version>
+ <relativePath/> <!-- lookup parent from repository -->
+ </parent>
+
+ <groupId>de.juplo.kafka.payment</groupId>
+ <artifactId>transfer</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <name>Transfer Service</name>
+ <description>An MVP for the Transfer Service</description>
+
+ <properties>
+ <java.version>11</java.version>
+ <confluent.version>6.2.0</confluent.version>
+ <kafka.version>2.8.0</kafka.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-validation</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.validation</groupId>
+ <artifactId>jakarta.validation-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-devtools</artifactId>
+ <scope>runtime</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.juplo.kafka.payment.transfer;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
+import de.juplo.kafka.payment.transfer.domain.TransferService;
+import de.juplo.kafka.payment.transfer.ports.MessagingService;
+import de.juplo.kafka.payment.transfer.ports.TransferRepository;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+
+import java.util.Properties;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(TransferServiceProperties.class)
+@Slf4j
+public class TransferServiceApplication
+{
+ @Bean(destroyMethod = "close")
+ KafkaProducer<String, String> producer(TransferServiceProperties properties)
+ {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ return new KafkaProducer<>(props);
+ }
+
+ @Bean
+ MessagingService kafkaMessagingService(
+ KafkaProducer<String, String> producer,
+ ObjectMapper mapper,
+ TransferServiceProperties properties)
+ {
+ return new KafkaMessagingService(producer, mapper, properties.topic);
+ }
+
+ @Bean
+ TransferService transferService(
+ TransferRepository repository,
+ MessagingService messagingService)
+ {
+ return new TransferService(repository, messagingService);
+ }
+
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(TransferServiceApplication.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.transfer")
+@Getter
+@Setter
+public class TransferServiceProperties
+{
+ String bootstrapServers = "localhost:9092";
+ String topic = "transfers";
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.adapter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.ports.MessagingService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.concurrent.CompletableFuture;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaMessagingService implements MessagingService
+{
+ private final KafkaProducer<String, String> producer;
+ private final ObjectMapper mapper;
+ private final String topic;
+
+
+ @Override
+ public CompletableFuture send(Transfer transfer)
+ {
+ try
+ {
+ CompletableFuture<TopicPartition> future = new CompletableFuture<>();
+ ProducerRecord<String, String> record =
+ new ProducerRecord<>(
+ topic,
+ Long.toString(transfer.getId()),
+ mapper.writeValueAsString(transfer));
+
+ producer.send(record, (metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
+ future.complete(new TopicPartition(metadata.topic(), metadata.partition()));
+ }
+ else
+ {
+ log.error("Could not send {}: {}", transfer, exception.getMessage());
+ future.completeExceptionally(exception);
+ }
+ });
+
+ return future;
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new RuntimeException("Could not convert " + transfer, e);
+ }
+ }
+
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.adapter;
+
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.validation.FieldError;
+import org.springframework.web.bind.MethodArgumentNotValidException;
+import org.springframework.web.bind.annotation.*;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.validation.Valid;
+import java.net.URI;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+
+@RestController
+@RequiredArgsConstructor
+@Slf4j
+ public class TransferController
+{
+ public final static String PATH = "/transfers";
+
+ private final InitiateTransferUseCase initiateTransferUseCase;
+ private final GetTransferUseCase getTransferUseCase;
+
+
+ @PostMapping(
+ path = PATH,
+ consumes = MediaType.APPLICATION_JSON_VALUE,
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity<?> transfer(@Valid @RequestBody TransferDTO transferDTO)
+ {
+ Transfer transfer =
+ Transfer
+ .builder()
+ .id(transferDTO.getId())
+ .payer(transferDTO.getPayer())
+ .payee(transferDTO.getPayee())
+ .amount(transferDTO.getAmount())
+ .build();
+
+ initiateTransferUseCase.initiate(transfer);
+
+ return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build();
+ }
+
+ @GetMapping(
+ path = PATH + "/{id}",
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity<TransferDTO> get(@PathVariable Long id)
+ {
+ return
+ getTransferUseCase
+ .get(id)
+ .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer)))
+ .orElse(ResponseEntity.notFound().build());
+ }
+
+ @ResponseStatus(HttpStatus.BAD_REQUEST)
+ @ExceptionHandler(MethodArgumentNotValidException.class)
+ public Map<String, Object> handleValidationExceptions(
+ HttpServletRequest request,
+ MethodArgumentNotValidException e)
+ {
+ Map<String, Object> errorAttributes = new HashMap<>();
+ errorAttributes.put("status", HttpStatus.BAD_REQUEST.value());
+ errorAttributes.put("error", HttpStatus.BAD_REQUEST.getReasonPhrase());
+ errorAttributes.put("path", request.getRequestURI());
+ errorAttributes.put("method", request.getMethod());
+ errorAttributes.put("timestamp", new Date());
+ Map<String, String> errors = new HashMap<>();
+ e.getBindingResult().getAllErrors().forEach((error) -> {
+ String fieldName = ((FieldError) error).getField();
+ String errorMessage = error.getDefaultMessage();
+ errors.put(fieldName, errorMessage);
+ });
+ errorAttributes.put("errors", errors);
+ errorAttributes.put("message", "Validation failed: Invalid message format, error count: " + errors.size());
+ return errorAttributes;
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.adapter;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import lombok.Builder;
+import lombok.Data;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+
+/**
+ * Simple DTO used by the REST interface
+ */
+@Data
+@Builder
+public class TransferDTO
+{
+ @NotNull(message = "Cannot be null")
+ @Min(value = 1, message = "A valid transfer id must be a positive number")
+ private Long id;
+ @NotNull(message = "Cannot be null")
+ @Min(value = 1, message = "A valid bank account id must be a positive number")
+ private Long payer;
+ @NotNull(message = "Cannot be null")
+ @Min(value = 1, message = "A valid bank account id must be a positive number")
+ private Long payee;
+ @NotNull(message = "Cannot be null")
+ @Min(value = 1, message = "The amount of a transfer must be a positv value")
+ private Integer amount;
+
+ private Transfer.State state;
+
+
+ public Transfer toTransfer()
+ {
+ return
+ Transfer
+ .builder()
+ .id(id)
+ .payer(payer)
+ .payee(payee)
+ .amount(amount)
+ .build();
+ }
+
+
+ public static TransferDTO of(Transfer transfer)
+ {
+ return
+ TransferDTO
+ .builder()
+ .id(transfer.getId())
+ .payer(transfer.getPayer())
+ .payee(transfer.getPayee())
+ .amount(transfer.getAmount())
+ .state(transfer.getState())
+ .build();
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.domain;
+
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@Builder
+@EqualsAndHashCode(exclude = "state")
+public class Transfer
+{
+ public enum State
+ {
+ SENT,
+ FAILED,
+ PENDING,
+ APPROVED,
+ REJECTED
+ }
+
+ private final long id;
+ private final long payer;
+ private final long payee;
+ private final int amount;
+
+ private State state;
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.domain;
+
+
+import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.MessagingService;
+import de.juplo.kafka.payment.transfer.ports.TransferRepository;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Optional;
+
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
+
+
+@Slf4j
+@RequiredArgsConstructor
+public class TransferService implements InitiateTransferUseCase, GetTransferUseCase
+{
+ private final TransferRepository repository;
+ private final MessagingService messagingService;
+
+ public synchronized void initiate(Transfer transfer)
+ {
+ repository
+ .get(transfer.getId())
+ .ifPresentOrElse(
+ stored ->
+ {
+ if (!transfer.equals(stored))
+ throw new IllegalArgumentException(
+ "Re-Initiation of transfer with different data: old=" +
+ stored +
+ ", new=" +
+ transfer);
+
+ if (stored.getState() == FAILED)
+ {
+ repository.update(transfer.getId(), FAILED, SENT);
+ log.info("Resending faild transfer: " + stored);
+ send(transfer);
+ }
+ },
+ () ->
+ {
+ send(transfer);
+ transfer.setState(SENT);
+ repository.store(transfer);
+ });
+ }
+
+ private void send(Transfer transfer)
+ {
+ messagingService
+ .send(transfer)
+ .thenApply(
+ $ ->
+ {
+ repository.update(transfer.getId(), SENT, PENDING);
+ return null;
+ })
+ .exceptionally(
+ e ->
+ {
+ repository.update(transfer.getId(), SENT, FAILED);
+ return null;
+ });
+ }
+
+ public Optional<Transfer> get(Long id)
+ {
+ return repository.get(id);
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.persistence;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.ports.TransferRepository;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+
+@Component
+@RequiredArgsConstructor
+@Slf4j
+public class InMemoryTransferRepository implements TransferRepository
+{
+ private final Map<Long, String> map = new HashMap<>();
+ private final ObjectMapper mapper;
+
+
+ @Override
+ public synchronized void store(Transfer transfer)
+ {
+ Optional
+ .ofNullable(map.get(transfer.getId()))
+ .ifPresentOrElse(
+ json ->
+ {
+ throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
+ },
+ () -> put(transfer));
+ }
+
+ private void put(Transfer transfer)
+ {
+ try
+ {
+ map.put(transfer.getId(), mapper.writeValueAsString(transfer));
+ }
+ catch (JsonProcessingException e)
+ {
+ log.error("Could not convert Transfer.class: {}", transfer, e);
+ }
+ }
+
+ @Override
+ public synchronized Optional<Transfer> get(Long id)
+ {
+ return
+ Optional
+ .ofNullable(map.get(id))
+ .map(json -> {
+ try
+ {
+ return mapper.readValue(json, Transfer.class);
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new RuntimeException("Could not convert JSON: " + json, e);
+ }
+ });
+ }
+
+ @Override
+ public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
+ {
+ Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
+
+ if (transfer.getState() != oldState)
+ throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
+
+ transfer.setState(newState);
+ put(transfer);
+ }
+
+ @Override
+ public void remove(Long id)
+ {
+ map.remove(id);
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.ports;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+
+import java.util.Optional;
+
+
+public interface GetTransferUseCase
+{
+ Optional<Transfer> get(Long id);
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.ports;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+
+
+public interface InitiateTransferUseCase
+{
+ void initiate(Transfer transfer);
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.ports;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+
+import java.util.concurrent.CompletableFuture;
+
+
+public interface MessagingService
+{
+ CompletableFuture send(Transfer transfer);
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.ports;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+
+import java.util.Optional;
+
+
+public interface TransferRepository
+{
+ void store(Transfer transfer);
+
+ Optional<Transfer> get(Long id);
+
+ void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException;
+
+ void remove(Long id);
+}
--- /dev/null
+management.endpoints.web.exposure.include=*
+
+logging.level.de.juplo=debug
--- /dev/null
+package de.juplo.kafka.payment.transfer;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class TransferServiceApplicationTests
+{
+ @Test
+ void contextLoads()
+ {
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.domain;
+
+import org.junit.jupiter.api.Test;
+
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.PENDING;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.SENT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class TransferTest
+{
+ @Test
+ public void testEqualsIgnoresState()
+ {
+ Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(SENT).build();
+ Transfer b = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(PENDING).build();
+
+ assertThat(a).isEqualTo(b);
+ }
+}