Version von SumUp-Requests, die einen fachlichen Fehler erzeugt sumup-requests--json--fehlerteufel sumup-requests--fehlerteufel---lvm-2-tage
authorKai Moritz <kai@juplo.de>
Sun, 18 Sep 2022 04:44:38 +0000 (06:44 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 18 Sep 2022 05:01:08 +0000 (07:01 +0200)
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java

index d0a401e..f10f417 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/sumup-requests-json:1.0-SNAPSHOT
+IMAGE=juplo/sumup-requests-fehlerteufel:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
index c4492a4..e59bb74 100644 (file)
@@ -87,10 +87,11 @@ services:
       sumup.gateway.topic: in
 
   requests:
-    image: juplo/sumup-requests-json:1.0-SNAPSHOT
+    image: juplo/sumup-requests-fehlerteufel:1.0-SNAPSHOT
     ports:
       - 8081:8080
     environment:
       server.port: 8080
       sumup.requests.bootstrap-server: kafka:9092
       sumup.requests.client-id: requests
+      sumup.requests.error-position: 6
diff --git a/pom.xml b/pom.xml
index a0fe12a..470a1aa 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -12,8 +12,8 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>sumup-requests-json</artifactId>
-  <name>SumUp-Requests</name>
+  <artifactId>sumup-requests-fehlerteufel</artifactId>
+  <name>SumUp-Requests (Fehlerteufel!)</name>
   <description>A service that reads computation requests from an incomming topic and generates according messages for the SumUp-Consumer on an outgoing topic. This version generates two types of JSON-messages.</description>
   <version>1.0-SNAPSHOT</version>
 
index 033d0cc..598a694 100644 (file)
@@ -9,6 +9,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -25,6 +27,7 @@ public class ApplicationConfiguration
   {
     return new ApplicationRecordHandler(
         kafkaProducer,
+        Optional.ofNullable(properties.getErrorPosition()),
         properties.getClientId(),
         properties.getTopicOut());
   }
index ccddc81..d113908 100644 (file)
@@ -16,6 +16,7 @@ import java.time.Duration;
 @Setter
 public class ApplicationProperties
 {
+  private Integer errorPosition;
   @NotNull
   @NotEmpty
   private String bootstrapServer;
index 8431a53..abc8868 100644 (file)
@@ -6,15 +6,20 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import java.util.Optional;
+
 
 @RequiredArgsConstructor
 @Slf4j
 public class ApplicationRecordHandler implements RecordHandler<String, Integer>
 {
   private final Producer<String, Object> producer;
+  private final Optional<Integer> errorPosition;
   private final String id;
   private final String topic;
 
+  private int counter = 0;
+
 
   @Override
   public void accept(ConsumerRecord<String, Integer> record)
@@ -24,6 +29,11 @@ public class ApplicationRecordHandler implements RecordHandler<String, Integer>
 
     for (int i = 1; i <= number; i++)
     {
+      if (errorPosition.isPresent() && ++counter == errorPosition.get())
+      {
+        log.info("{} - Erzeuge fachlichen Fehler!");
+        send(key, new AddNumberMessage(number, counter * -1));
+      }
       send(key, new AddNumberMessage(number, i));
     }
     send(key, new CalculateSumMessage(number));