Version von SumUp-Requests, die einen fachlichen Fehler erzeugt
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
index 8431a53..abc8868 100644 (file)
@@ -6,15 +6,20 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import java.util.Optional;
+
 
 @RequiredArgsConstructor
 @Slf4j
 public class ApplicationRecordHandler implements RecordHandler<String, Integer>
 {
   private final Producer<String, Object> producer;
+  private final Optional<Integer> errorPosition;
   private final String id;
   private final String topic;
 
+  private int counter = 0;
+
 
   @Override
   public void accept(ConsumerRecord<String, Integer> record)
@@ -24,6 +29,11 @@ public class ApplicationRecordHandler implements RecordHandler<String, Integer>
 
     for (int i = 1; i <= number; i++)
     {
+      if (errorPosition.isPresent() && ++counter == errorPosition.get())
+      {
+        log.info("{} - Erzeuge fachlichen Fehler!");
+        send(key, new AddNumberMessage(number, counter * -1));
+      }
       send(key, new AddNumberMessage(number, i));
     }
     send(key, new CalculateSumMessage(number));