WIP wip-initialer-commit
authorKai Moritz <kai@juplo.de>
Sun, 13 Jun 2021 08:48:28 +0000 (10:48 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 13 Jun 2021 08:48:28 +0000 (10:48 +0200)
28 files changed:
.dockerignore [new file with mode: 0644]
.gitignore [new file with mode: 0644]
Dockerfile [new file with mode: 0644]
pom.xml [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java [new file with mode: 0644]
src/main/resources/application.properties [new file with mode: 0644]
src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java [new file with mode: 0644]
transfer/.dockerignore [deleted file]
transfer/.gitignore [deleted file]
transfer/Dockerfile [deleted file]
transfer/pom.xml [deleted file]
transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java [deleted file]
transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java [deleted file]
transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java [deleted file]
transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java [deleted file]
transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java [deleted file]
transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java [deleted file]
transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java [deleted file]
transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java [deleted file]
transfer/src/main/resources/application.properties [deleted file]
transfer/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java [deleted file]

diff --git a/.dockerignore b/.dockerignore
new file mode 100644 (file)
index 0000000..1ad9963
--- /dev/null
@@ -0,0 +1,2 @@
+*
+!target/*.jar
diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..549e00a
--- /dev/null
@@ -0,0 +1,33 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
diff --git a/Dockerfile b/Dockerfile
new file mode 100644 (file)
index 0000000..cd6a95b
--- /dev/null
@@ -0,0 +1,4 @@
+FROM openjdk:8-jre-slim
+COPY target/take-order-01-0-SNAPSHOT.jar /opt/
+EXPOSE 8080
+CMD ["java", "-jar", "/opt/take-order-01-0-SNAPSHOT.jar"]
diff --git a/pom.xml b/pom.xml
new file mode 100644 (file)
index 0000000..8a26995
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.springframework.boot</groupId>
+    <artifactId>spring-boot-starter-parent</artifactId>
+    <version>2.5.1</version>
+    <relativePath/> <!-- lookup parent from repository -->
+  </parent>
+  <groupId>de.juplo.kafka.payment</groupId>
+  <artifactId>transfer</artifactId>
+  <version>1.0-SNAPSHOT</version>
+  <name>Transfer Service</name>
+  <description>An MVP for the Transfer Service</description>
+  <properties>
+    <java.version>11</java.version>
+    <confluent.version>6.2.0</confluent.version>
+    <kafka.version>2.8.0</kafka.version>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-actuator</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-web</artifactId>
+    </dependency>
+    <dependency> 
+      <groupId>org.springframework.boot</groupId> 
+      <artifactId>spring-boot-starter-validation</artifactId> 
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-devtools</artifactId>
+      <scope>runtime</scope>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>jakarta.validation</groupId>
+      <artifactId>jakarta.validation-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-maven-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>
+              <groupId>org.projectlombok</groupId>
+              <artifactId>lombok</artifactId>
+            </exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
new file mode 100644 (file)
index 0000000..320b841
--- /dev/null
@@ -0,0 +1,51 @@
+package de.juplo.kafka.payment.transfer;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.TransferRepository;
+import de.juplo.kafka.payment.transfer.domain.TransferService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+
+import java.util.Properties;
+import java.util.UUID;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(TransferServiceProperties.class)
+@Slf4j
+public class TransferServiceApplication
+{
+  @Bean(destroyMethod = "close")
+  KafkaProducer<String, String> producer(TransferServiceProperties properties)
+  {
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+    return new KafkaProducer<>(props);
+  }
+
+  @Bean
+  TransferService transferService(
+      TransferRepository repository,
+      KafkaProducer<String, String> producer,
+      ObjectMapper mapper,
+      TransferServiceProperties properties)
+  {
+    return new TransferService(repository, producer, mapper, properties.topic);
+  }
+
+
+  public static void main(String[] args)
+  {
+    SpringApplication.run(TransferServiceApplication.class, args);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java
new file mode 100644 (file)
index 0000000..ccd22a3
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.payment.transfer;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.transfer")
+@Getter
+@Setter
+public class TransferServiceProperties
+{
+  String bootstrapServers = "localhost:9092";
+  String topic = "transfers";
+}
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java b/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java
new file mode 100644 (file)
index 0000000..2d83ad9
--- /dev/null
@@ -0,0 +1,88 @@
+package de.juplo.kafka.payment.transfer.controller;
+
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.domain.TransferService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.validation.FieldError;
+import org.springframework.web.bind.MethodArgumentNotValidException;
+import org.springframework.web.bind.annotation.*;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.validation.Valid;
+import java.net.URI;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+
+@RestController
+@RequiredArgsConstructor
+@Slf4j
+ public class TransferController
+{
+  public final static String PATH = "/transfers";
+
+  private final TransferService service;
+
+
+  @PostMapping(
+      path = PATH,
+      consumes = MediaType.APPLICATION_JSON_VALUE,
+      produces = MediaType.APPLICATION_JSON_VALUE)
+  public ResponseEntity<?> transfer(@Valid @RequestBody TransferDTO transferDTO)
+  {
+    Transfer transfer =
+        Transfer
+            .builder()
+            .id(transferDTO.getId())
+            .payer(transferDTO.getPayer())
+            .payee(transferDTO.getPayee())
+            .amount(transferDTO.getAmount())
+            .build();
+
+    service.initiate(transfer);
+
+    return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build();
+  }
+
+  @GetMapping(
+      path = PATH + "/{id}",
+      produces = MediaType.APPLICATION_JSON_VALUE)
+  public ResponseEntity<TransferDTO> get(@PathVariable Long id)
+  {
+    return
+        service
+            .get(id)
+            .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer)))
+            .orElse(ResponseEntity.notFound().build());
+  }
+
+  @ResponseStatus(HttpStatus.BAD_REQUEST)
+  @ExceptionHandler(MethodArgumentNotValidException.class)
+  public Map<String, Object> handleValidationExceptions(
+      HttpServletRequest request,
+      MethodArgumentNotValidException e)
+  {
+    Map<String, Object> errorAttributes = new HashMap<>();
+    errorAttributes.put("status", HttpStatus.BAD_REQUEST.value());
+    errorAttributes.put("error", HttpStatus.BAD_REQUEST.getReasonPhrase());
+    errorAttributes.put("path", request.getRequestURI());
+    errorAttributes.put("method", request.getMethod());
+    errorAttributes.put("timestamp", new Date());
+    Map<String, String> errors = new HashMap<>();
+    e.getBindingResult().getAllErrors().forEach((error) -> {
+      String fieldName = ((FieldError) error).getField();
+      String errorMessage = error.getDefaultMessage();
+      errors.put(fieldName, errorMessage);
+    });
+    errorAttributes.put("errors", errors);
+    errorAttributes.put("message", "Validation failed: Invalid message format, error count: " + errors.size());
+    return errorAttributes;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java b/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java
new file mode 100644 (file)
index 0000000..ad4f57d
--- /dev/null
@@ -0,0 +1,59 @@
+package de.juplo.kafka.payment.transfer.controller;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import lombok.Builder;
+import lombok.Data;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+
+/**
+ * Simple DTO used by the REST interface
+ */
+@Data
+@Builder
+public class TransferDTO
+{
+  @NotNull(message = "Cannot be null")
+  @Min(value = 1, message = "A valid transfer id must be a positive number")
+  private Long id;
+  @NotNull(message = "Cannot be null")
+  @Min(value = 1, message = "A valid bank account id must be a positive number")
+  private Long payer;
+  @NotNull(message = "Cannot be null")
+  @Min(value = 1, message = "A valid bank account id must be a positive number")
+  private Long payee;
+  @NotNull(message = "Cannot be null")
+  @Min(value = 1, message = "The amount of a transfer must be a positv value")
+  private Integer amount;
+
+  private Transfer.State state;
+
+
+  public Transfer toTransfer()
+  {
+    return
+        Transfer
+            .builder()
+            .id(id)
+            .payer(payer)
+            .payee(payee)
+            .amount(amount)
+            .build();
+  }
+
+
+  public static TransferDTO of(Transfer transfer)
+  {
+    return
+        TransferDTO
+            .builder()
+            .id(transfer.getId())
+            .payer(transfer.getPayer())
+            .payee(transfer.getPayee())
+            .amount(transfer.getAmount())
+            .state(transfer.getState())
+            .build();
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java
new file mode 100644 (file)
index 0000000..5556a1b
--- /dev/null
@@ -0,0 +1,29 @@
+package de.juplo.kafka.payment.transfer.domain;
+
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@Builder
+@EqualsAndHashCode(exclude = "state")
+public class Transfer
+{
+  public enum State
+  {
+    SENT,
+    FAILED,
+    PENDING,
+    APPROVED,
+    REJECTED
+  }
+
+  private final long id;
+  private final long payer;
+  private final long payee;
+  private final int amount;
+
+  private State state;
+}
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java
new file mode 100644 (file)
index 0000000..36d027c
--- /dev/null
@@ -0,0 +1,15 @@
+package de.juplo.kafka.payment.transfer.domain;
+
+import java.util.Optional;
+
+
+public interface TransferRepository
+{
+  void store(Transfer transfer);
+
+  Optional<Transfer> get(Long id);
+
+  void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException;
+
+  void remove(Long id);
+}
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java
new file mode 100644 (file)
index 0000000..d708826
--- /dev/null
@@ -0,0 +1,89 @@
+package de.juplo.kafka.payment.transfer.domain;
+
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Optional;
+
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
+
+
+@Slf4j
+@RequiredArgsConstructor
+public class TransferService
+{
+  private final TransferRepository repository;
+  private final KafkaProducer<String, String> producer;
+  private final ObjectMapper mapper;
+  private final String topic;
+
+  public synchronized void initiate(Transfer transfer)
+  {
+    repository
+        .get(transfer.getId())
+        .ifPresentOrElse(
+            stored ->
+            {
+              if (!transfer.equals(stored))
+                throw new IllegalArgumentException(
+                    "Re-Initiation of transfer with different data: old=" +
+                        stored +
+                        ", new=" +
+                        transfer);
+
+              if (stored.getState() == FAILED)
+              {
+                repository.update(transfer.getId(), FAILED, SENT);
+                log.info("Resending faild transfer: " + stored);
+                send(transfer);
+              }
+            },
+            () ->
+            {
+              send(transfer);
+              transfer.setState(SENT);
+              repository.store(transfer);
+            });
+  }
+
+
+  private void send(Transfer transfer)
+  {
+    try
+    {
+      ProducerRecord<String, String> record =
+          new ProducerRecord<>(
+              topic,
+              Long.toString(transfer.getId()),
+              mapper.writeValueAsString(transfer));
+
+      producer.send(record, (metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
+          repository.update(transfer.getId(), SENT, PENDING);
+        }
+        else
+        {
+          log.error("Could not send {}: {}", transfer, exception.getMessage());
+          repository.update(transfer.getId(), SENT, FAILED);
+        }
+      });
+    }
+    catch (JsonProcessingException e)
+    {
+      throw new RuntimeException("Could not convert " + transfer, e);
+    }
+  }
+
+  public Optional<Transfer> get(Long id)
+  {
+    return repository.get(id);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java
new file mode 100644 (file)
index 0000000..5ef2094
--- /dev/null
@@ -0,0 +1,81 @@
+package de.juplo.kafka.payment.transfer.persistence;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.domain.TransferRepository;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+
+@Component
+@RequiredArgsConstructor
+@Slf4j
+public class InMemoryTransferRepository implements TransferRepository
+{
+  private final Map<Long, String> map = new HashMap<>();
+  private final ObjectMapper mapper;
+
+
+  @Override
+  public synchronized void store(Transfer transfer)
+  {
+    Optional
+        .ofNullable(map.get(transfer.getId()))
+        .ifPresent(
+            json ->
+            {
+              throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
+            });
+
+    try
+    {
+      map.put(transfer.getId(), mapper.writeValueAsString(transfer));
+    }
+    catch (JsonProcessingException e)
+    {
+      log.error("Could not convert Transfer.class: {}", transfer, e);
+    }
+  }
+
+  @Override
+  public synchronized Optional<Transfer> get(Long id)
+  {
+    return
+        Optional
+            .ofNullable(map.get(id))
+            .map(json -> {
+              try
+              {
+                return mapper.readValue(json, Transfer.class);
+              }
+              catch (JsonProcessingException e)
+              {
+                throw new RuntimeException("Could not convert JSON: " + json, e);
+              }
+            });
+  }
+
+  @Override
+  public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
+  {
+    Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
+
+    if (transfer.getState() != oldState)
+      throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
+
+    transfer.setState(newState);
+    store(transfer);
+  }
+
+  @Override
+  public void remove(Long id)
+  {
+    map.remove(id);
+  }
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
new file mode 100644 (file)
index 0000000..f22f985
--- /dev/null
@@ -0,0 +1,3 @@
+management.endpoints.web.exposure.include=*
+
+logging.level.de.trion=info
diff --git a/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java b/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java
new file mode 100644 (file)
index 0000000..29b0d02
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.payment.transfer;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class TransferServiceApplicationTests {
+
+       @Test
+       void contextLoads() {
+       }
+
+}
diff --git a/transfer/.dockerignore b/transfer/.dockerignore
deleted file mode 100644 (file)
index 1ad9963..0000000
+++ /dev/null
@@ -1,2 +0,0 @@
-*
-!target/*.jar
diff --git a/transfer/.gitignore b/transfer/.gitignore
deleted file mode 100644 (file)
index 549e00a..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-HELP.md
-target/
-!.mvn/wrapper/maven-wrapper.jar
-!**/src/main/**/target/
-!**/src/test/**/target/
-
-### STS ###
-.apt_generated
-.classpath
-.factorypath
-.project
-.settings
-.springBeans
-.sts4-cache
-
-### IntelliJ IDEA ###
-.idea
-*.iws
-*.iml
-*.ipr
-
-### NetBeans ###
-/nbproject/private/
-/nbbuild/
-/dist/
-/nbdist/
-/.nb-gradle/
-build/
-!**/src/main/**/build/
-!**/src/test/**/build/
-
-### VS Code ###
-.vscode/
diff --git a/transfer/Dockerfile b/transfer/Dockerfile
deleted file mode 100644 (file)
index cd6a95b..0000000
+++ /dev/null
@@ -1,4 +0,0 @@
-FROM openjdk:8-jre-slim
-COPY target/take-order-01-0-SNAPSHOT.jar /opt/
-EXPOSE 8080
-CMD ["java", "-jar", "/opt/take-order-01-0-SNAPSHOT.jar"]
diff --git a/transfer/pom.xml b/transfer/pom.xml
deleted file mode 100644 (file)
index 8a26995..0000000
+++ /dev/null
@@ -1,78 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.springframework.boot</groupId>
-    <artifactId>spring-boot-starter-parent</artifactId>
-    <version>2.5.1</version>
-    <relativePath/> <!-- lookup parent from repository -->
-  </parent>
-  <groupId>de.juplo.kafka.payment</groupId>
-  <artifactId>transfer</artifactId>
-  <version>1.0-SNAPSHOT</version>
-  <name>Transfer Service</name>
-  <description>An MVP for the Transfer Service</description>
-  <properties>
-    <java.version>11</java.version>
-    <confluent.version>6.2.0</confluent.version>
-    <kafka.version>2.8.0</kafka.version>
-  </properties>
-  <dependencies>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-actuator</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-web</artifactId>
-    </dependency>
-    <dependency> 
-      <groupId>org.springframework.boot</groupId> 
-      <artifactId>spring-boot-starter-validation</artifactId> 
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-devtools</artifactId>
-      <scope>runtime</scope>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>jakarta.validation</groupId>
-      <artifactId>jakarta.validation-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.projectlombok</groupId>
-      <artifactId>lombok</artifactId>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-test</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.springframework.boot</groupId>
-        <artifactId>spring-boot-maven-plugin</artifactId>
-        <configuration>
-          <excludes>
-            <exclude>
-              <groupId>org.projectlombok</groupId>
-              <artifactId>lombok</artifactId>
-            </exclude>
-          </excludes>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
deleted file mode 100644 (file)
index 320b841..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-package de.juplo.kafka.payment.transfer;
-
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.payment.transfer.domain.TransferRepository;
-import de.juplo.kafka.payment.transfer.domain.TransferService;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-
-import java.util.Properties;
-import java.util.UUID;
-
-
-@SpringBootApplication
-@EnableConfigurationProperties(TransferServiceProperties.class)
-@Slf4j
-public class TransferServiceApplication
-{
-  @Bean(destroyMethod = "close")
-  KafkaProducer<String, String> producer(TransferServiceProperties properties)
-  {
-    Properties props = new Properties();
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
-    return new KafkaProducer<>(props);
-  }
-
-  @Bean
-  TransferService transferService(
-      TransferRepository repository,
-      KafkaProducer<String, String> producer,
-      ObjectMapper mapper,
-      TransferServiceProperties properties)
-  {
-    return new TransferService(repository, producer, mapper, properties.topic);
-  }
-
-
-  public static void main(String[] args)
-  {
-    SpringApplication.run(TransferServiceApplication.class, args);
-  }
-}
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java
deleted file mode 100644 (file)
index ccd22a3..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.kafka.payment.transfer;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.transfer")
-@Getter
-@Setter
-public class TransferServiceProperties
-{
-  String bootstrapServers = "localhost:9092";
-  String topic = "transfers";
-}
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferController.java
deleted file mode 100644 (file)
index 2d83ad9..0000000
+++ /dev/null
@@ -1,88 +0,0 @@
-package de.juplo.kafka.payment.transfer.controller;
-
-
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-import de.juplo.kafka.payment.transfer.domain.TransferService;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
-import org.springframework.validation.FieldError;
-import org.springframework.web.bind.MethodArgumentNotValidException;
-import org.springframework.web.bind.annotation.*;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.validation.Valid;
-import java.net.URI;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-@RestController
-@RequiredArgsConstructor
-@Slf4j
- public class TransferController
-{
-  public final static String PATH = "/transfers";
-
-  private final TransferService service;
-
-
-  @PostMapping(
-      path = PATH,
-      consumes = MediaType.APPLICATION_JSON_VALUE,
-      produces = MediaType.APPLICATION_JSON_VALUE)
-  public ResponseEntity<?> transfer(@Valid @RequestBody TransferDTO transferDTO)
-  {
-    Transfer transfer =
-        Transfer
-            .builder()
-            .id(transferDTO.getId())
-            .payer(transferDTO.getPayer())
-            .payee(transferDTO.getPayee())
-            .amount(transferDTO.getAmount())
-            .build();
-
-    service.initiate(transfer);
-
-    return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build();
-  }
-
-  @GetMapping(
-      path = PATH + "/{id}",
-      produces = MediaType.APPLICATION_JSON_VALUE)
-  public ResponseEntity<TransferDTO> get(@PathVariable Long id)
-  {
-    return
-        service
-            .get(id)
-            .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer)))
-            .orElse(ResponseEntity.notFound().build());
-  }
-
-  @ResponseStatus(HttpStatus.BAD_REQUEST)
-  @ExceptionHandler(MethodArgumentNotValidException.class)
-  public Map<String, Object> handleValidationExceptions(
-      HttpServletRequest request,
-      MethodArgumentNotValidException e)
-  {
-    Map<String, Object> errorAttributes = new HashMap<>();
-    errorAttributes.put("status", HttpStatus.BAD_REQUEST.value());
-    errorAttributes.put("error", HttpStatus.BAD_REQUEST.getReasonPhrase());
-    errorAttributes.put("path", request.getRequestURI());
-    errorAttributes.put("method", request.getMethod());
-    errorAttributes.put("timestamp", new Date());
-    Map<String, String> errors = new HashMap<>();
-    e.getBindingResult().getAllErrors().forEach((error) -> {
-      String fieldName = ((FieldError) error).getField();
-      String errorMessage = error.getDefaultMessage();
-      errors.put(fieldName, errorMessage);
-    });
-    errorAttributes.put("errors", errors);
-    errorAttributes.put("message", "Validation failed: Invalid message format, error count: " + errors.size());
-    return errorAttributes;
-  }
-}
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/controller/TransferDTO.java
deleted file mode 100644 (file)
index ad4f57d..0000000
+++ /dev/null
@@ -1,59 +0,0 @@
-package de.juplo.kafka.payment.transfer.controller;
-
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-import lombok.Builder;
-import lombok.Data;
-
-import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
-
-
-/**
- * Simple DTO used by the REST interface
- */
-@Data
-@Builder
-public class TransferDTO
-{
-  @NotNull(message = "Cannot be null")
-  @Min(value = 1, message = "A valid transfer id must be a positive number")
-  private Long id;
-  @NotNull(message = "Cannot be null")
-  @Min(value = 1, message = "A valid bank account id must be a positive number")
-  private Long payer;
-  @NotNull(message = "Cannot be null")
-  @Min(value = 1, message = "A valid bank account id must be a positive number")
-  private Long payee;
-  @NotNull(message = "Cannot be null")
-  @Min(value = 1, message = "The amount of a transfer must be a positv value")
-  private Integer amount;
-
-  private Transfer.State state;
-
-
-  public Transfer toTransfer()
-  {
-    return
-        Transfer
-            .builder()
-            .id(id)
-            .payer(payer)
-            .payee(payee)
-            .amount(amount)
-            .build();
-  }
-
-
-  public static TransferDTO of(Transfer transfer)
-  {
-    return
-        TransferDTO
-            .builder()
-            .id(transfer.getId())
-            .payer(transfer.getPayer())
-            .payee(transfer.getPayee())
-            .amount(transfer.getAmount())
-            .state(transfer.getState())
-            .build();
-  }
-}
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java
deleted file mode 100644 (file)
index 5556a1b..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-package de.juplo.kafka.payment.transfer.domain;
-
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-
-@Data
-@Builder
-@EqualsAndHashCode(exclude = "state")
-public class Transfer
-{
-  public enum State
-  {
-    SENT,
-    FAILED,
-    PENDING,
-    APPROVED,
-    REJECTED
-  }
-
-  private final long id;
-  private final long payer;
-  private final long payee;
-  private final int amount;
-
-  private State state;
-}
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java
deleted file mode 100644 (file)
index 36d027c..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-package de.juplo.kafka.payment.transfer.domain;
-
-import java.util.Optional;
-
-
-public interface TransferRepository
-{
-  void store(Transfer transfer);
-
-  Optional<Transfer> get(Long id);
-
-  void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException;
-
-  void remove(Long id);
-}
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java
deleted file mode 100644 (file)
index d708826..0000000
+++ /dev/null
@@ -1,89 +0,0 @@
-package de.juplo.kafka.payment.transfer.domain;
-
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import java.util.Optional;
-
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
-
-
-@Slf4j
-@RequiredArgsConstructor
-public class TransferService
-{
-  private final TransferRepository repository;
-  private final KafkaProducer<String, String> producer;
-  private final ObjectMapper mapper;
-  private final String topic;
-
-  public synchronized void initiate(Transfer transfer)
-  {
-    repository
-        .get(transfer.getId())
-        .ifPresentOrElse(
-            stored ->
-            {
-              if (!transfer.equals(stored))
-                throw new IllegalArgumentException(
-                    "Re-Initiation of transfer with different data: old=" +
-                        stored +
-                        ", new=" +
-                        transfer);
-
-              if (stored.getState() == FAILED)
-              {
-                repository.update(transfer.getId(), FAILED, SENT);
-                log.info("Resending faild transfer: " + stored);
-                send(transfer);
-              }
-            },
-            () ->
-            {
-              send(transfer);
-              transfer.setState(SENT);
-              repository.store(transfer);
-            });
-  }
-
-
-  private void send(Transfer transfer)
-  {
-    try
-    {
-      ProducerRecord<String, String> record =
-          new ProducerRecord<>(
-              topic,
-              Long.toString(transfer.getId()),
-              mapper.writeValueAsString(transfer));
-
-      producer.send(record, (metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
-          repository.update(transfer.getId(), SENT, PENDING);
-        }
-        else
-        {
-          log.error("Could not send {}: {}", transfer, exception.getMessage());
-          repository.update(transfer.getId(), SENT, FAILED);
-        }
-      });
-    }
-    catch (JsonProcessingException e)
-    {
-      throw new RuntimeException("Could not convert " + transfer, e);
-    }
-  }
-
-  public Optional<Transfer> get(Long id)
-  {
-    return repository.get(id);
-  }
-}
diff --git a/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/transfer/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java
deleted file mode 100644 (file)
index 5ef2094..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-package de.juplo.kafka.payment.transfer.persistence;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-import de.juplo.kafka.payment.transfer.domain.TransferRepository;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-@Component
-@RequiredArgsConstructor
-@Slf4j
-public class InMemoryTransferRepository implements TransferRepository
-{
-  private final Map<Long, String> map = new HashMap<>();
-  private final ObjectMapper mapper;
-
-
-  @Override
-  public synchronized void store(Transfer transfer)
-  {
-    Optional
-        .ofNullable(map.get(transfer.getId()))
-        .ifPresent(
-            json ->
-            {
-              throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
-            });
-
-    try
-    {
-      map.put(transfer.getId(), mapper.writeValueAsString(transfer));
-    }
-    catch (JsonProcessingException e)
-    {
-      log.error("Could not convert Transfer.class: {}", transfer, e);
-    }
-  }
-
-  @Override
-  public synchronized Optional<Transfer> get(Long id)
-  {
-    return
-        Optional
-            .ofNullable(map.get(id))
-            .map(json -> {
-              try
-              {
-                return mapper.readValue(json, Transfer.class);
-              }
-              catch (JsonProcessingException e)
-              {
-                throw new RuntimeException("Could not convert JSON: " + json, e);
-              }
-            });
-  }
-
-  @Override
-  public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
-  {
-    Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
-
-    if (transfer.getState() != oldState)
-      throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
-
-    transfer.setState(newState);
-    store(transfer);
-  }
-
-  @Override
-  public void remove(Long id)
-  {
-    map.remove(id);
-  }
-}
diff --git a/transfer/src/main/resources/application.properties b/transfer/src/main/resources/application.properties
deleted file mode 100644 (file)
index f22f985..0000000
+++ /dev/null
@@ -1,3 +0,0 @@
-management.endpoints.web.exposure.include=*
-
-logging.level.de.trion=info
diff --git a/transfer/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java b/transfer/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java
deleted file mode 100644 (file)
index 29b0d02..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-package de.juplo.kafka.payment.transfer;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
-
-@SpringBootTest
-class TransferServiceApplicationTests {
-
-       @Test
-       void contextLoads() {
-       }
-
-}