Service ergänzt, der das Dead-Letter-Topic ausliest
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
index bd9f449..e01fdd1 100644 (file)
@@ -41,17 +41,22 @@ public class ApplicationTests extends GenericApplicationTests<String, Message>
     final StringSerializer stringSerializer = new StringSerializer();
     final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
 
-    int counter = 0;
+    int counterMessages;
+    int counterPoisonPills;
+    int counterLogicErrors;
 
     Map<String, List<AdderResult>> state;
 
     @Override
-    public int generate(
+    public void generate(
         boolean poisonPills,
         boolean logicErrors,
         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
     {
-      counter = 0;
+      counterMessages = 0;
+      counterPoisonPills = 0;
+      counterLogicErrors = 0;
+
       state =
           Arrays
               .stream(dieWilden13)
@@ -76,8 +81,8 @@ public class ApplicationTests extends GenericApplicationTests<String, Message>
               key,
               calculateMessage,
               Message.Type.CALC,
-              poisonPill(poisonPills, pass, counter),
-              logicError(logicErrors, pass, counter),
+              poisonPill(poisonPills, pass, counterMessages),
+              logicError(logicErrors, pass, counterMessages),
               messageSender);
             state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
             // Pick next number to calculate
@@ -90,13 +95,29 @@ public class ApplicationTests extends GenericApplicationTests<String, Message>
             key,
             new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
             Message.Type.ADD,
-            poisonPill(poisonPills, pass, counter),
-            logicError(logicErrors, pass, counter),
+            poisonPill(poisonPills, pass, counterMessages),
+            logicError(logicErrors, pass, counterMessages),
             messageSender);
         }
       }
+    }
+
+    @Override
+    public int getNumberOfMessages()
+    {
+      return counterMessages;
+    }
 
-      return counter;
+    @Override
+    public int getNumberOfPoisonPills()
+    {
+      return counterPoisonPills;
+    }
+
+    @Override
+    public int getNumberOfLogicErrors()
+    {
+      return counterLogicErrors;
     }
 
     boolean poisonPill (boolean poisonPills, int pass, int counter)
@@ -117,15 +138,17 @@ public class ApplicationTests extends GenericApplicationTests<String, Message>
         boolean logicError,
         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
     {
-      counter++;
+      counterMessages++;
 
       if (logicError)
       {
         value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
+        counterLogicErrors++;
       }
       if (poisonPill)
       {
         value = new Bytes("BOOM!".getBytes());
+        counterPoisonPills++;
       }
 
       ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);