Der Test verwendet die `@Bean` von `EndlessConsumer`
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
index 4ff2a41..bd9f449 100644 (file)
@@ -4,7 +4,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.assertj.core.api.Assertions;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.*;
@@ -16,7 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 
 @Slf4j
-public class ApplicationTests extends GenericApplicationTests<String, String>
+public class ApplicationTests extends GenericApplicationTests<String, Message>
 {
   @Autowired
   StateRepository stateRepository;
@@ -40,7 +39,7 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
             .mapToObj(i -> "seeräuber-" + i)
             .toArray(i -> new String[i]);
     final StringSerializer stringSerializer = new StringSerializer();
-    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
+    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
 
     int counter = 0;
 
@@ -73,7 +72,13 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
 
           if (message[i] > number[i])
           {
-            send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender);
+            send(
+              key,
+              calculateMessage,
+              Message.Type.CALC,
+              poisonPill(poisonPills, pass, counter),
+              logicError(logicErrors, pass, counter),
+              messageSender);
             state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
             // Pick next number to calculate
             number[i] = numbers[next++%numbers.length];
@@ -81,15 +86,25 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
             log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
           }
 
-          Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++)));
-          send(key, value, fail(logicErrors, pass, counter), messageSender);
+          send(
+            key,
+            new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
+            Message.Type.ADD,
+            poisonPill(poisonPills, pass, counter),
+            logicError(logicErrors, pass, counter),
+            messageSender);
         }
       }
 
       return counter;
     }
 
-    boolean fail (boolean logicErrors, int pass, int counter)
+    boolean poisonPill (boolean poisonPills, int pass, int counter)
+    {
+      return poisonPills && pass > 300 && counter%99 == 0;
+    }
+
+    boolean logicError(boolean logicErrors, int pass, int counter)
     {
       return logicErrors && pass > 300 && counter%77 == 0;
     }
@@ -97,23 +112,25 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
     void send(
         Bytes key,
         Bytes value,
-        boolean fail,
+        Message.Type type,
+        boolean poisonPill,
+        boolean logicError,
         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
     {
       counter++;
 
-      if (fail)
+      if (logicError)
       {
-        value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
+        value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
+      }
+      if (poisonPill)
+      {
+        value = new Bytes("BOOM!".getBytes());
       }
 
-      messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
-    }
-
-    @Override
-    public boolean canGeneratePoisonPill()
-    {
-      return false;
+      ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
+      record.headers().add("__TypeId__", type.toString().getBytes());
+      messageSender.accept(record);
     }
 
     @Override