Code an die Version aus 'sumup-adder--springified' angepasst
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
index 740c09c..bd9f449 100644 (file)
@@ -15,7 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 
 @Slf4j
-public class ApplicationTests extends GenericApplicationTests<String, String>
+public class ApplicationTests extends GenericApplicationTests<String, Message>
 {
   @Autowired
   StateRepository stateRepository;
@@ -39,7 +39,7 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
             .mapToObj(i -> "seeräuber-" + i)
             .toArray(i -> new String[i]);
     final StringSerializer stringSerializer = new StringSerializer();
-    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
+    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
 
     int counter = 0;
 
@@ -72,7 +72,13 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
 
           if (message[i] > number[i])
           {
-            send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender);
+            send(
+              key,
+              calculateMessage,
+              Message.Type.CALC,
+              poisonPill(poisonPills, pass, counter),
+              logicError(logicErrors, pass, counter),
+              messageSender);
             state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
             // Pick next number to calculate
             number[i] = numbers[next++%numbers.length];
@@ -80,15 +86,25 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
             log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
           }
 
-          Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++)));
-          send(key, value, fail(logicErrors, pass, counter), messageSender);
+          send(
+            key,
+            new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
+            Message.Type.ADD,
+            poisonPill(poisonPills, pass, counter),
+            logicError(logicErrors, pass, counter),
+            messageSender);
         }
       }
 
       return counter;
     }
 
-    boolean fail (boolean logicErrors, int pass, int counter)
+    boolean poisonPill (boolean poisonPills, int pass, int counter)
+    {
+      return poisonPills && pass > 300 && counter%99 == 0;
+    }
+
+    boolean logicError(boolean logicErrors, int pass, int counter)
     {
       return logicErrors && pass > 300 && counter%77 == 0;
     }
@@ -96,23 +112,25 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
     void send(
         Bytes key,
         Bytes value,
-        boolean fail,
+        Message.Type type,
+        boolean poisonPill,
+        boolean logicError,
         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
     {
       counter++;
 
-      if (fail)
+      if (logicError)
       {
-        value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
+        value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
+      }
+      if (poisonPill)
+      {
+        value = new Bytes("BOOM!".getBytes());
       }
 
-      messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
-    }
-
-    @Override
-    public boolean canGeneratePoisonPill()
-    {
-      return false;
+      ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
+      record.headers().add("__TypeId__", type.toString().getBytes());
+      messageSender.accept(record);
     }
 
     @Override
@@ -124,15 +142,29 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
             tests.stateRepository.findById(Integer.toString(i)).get();
 
         stateDocument
-            .results.entrySet().stream()
+            .results
+            .entrySet()
+            .stream()
             .forEach(entry ->
             {
               String user = entry.getKey();
               List<AdderResult> resultsForUser = entry.getValue();
 
+              for (int j=0; j < resultsForUser.size(); j++)
+              {
+                if (!(j < state.get(user).size()))
+                {
+                  break;
+                }
+
+                assertThat(resultsForUser.get(j))
+                    .as("Unexpected results calculation %d of user %s", j, user)
+                    .isEqualTo(state.get(user).get(j));
+              }
+
               assertThat(state.get(user))
-                  .as("Unexpected results for user %s", user)
-                  .containsExactlyElementsOf(resultsForUser);
+                  .as("More results calculated for user %s as expected", user)
+                  .containsAll(resultsForUser);
             });
       }
     }