Anzahl der Fehler für die Test-Logik verfügbar gemacht deserialization
authorKai Moritz <kai@juplo.de>
Sun, 11 Sep 2022 11:42:57 +0000 (13:42 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 11 Sep 2022 15:50:28 +0000 (17:50 +0200)
* Conflicts:
** src/test/java/de/juplo/kafka/ApplicationTests.java

src/test/java/de/juplo/kafka/ApplicationTests.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index b7f8308..e5e0982 100644 (file)
@@ -22,46 +22,66 @@ public class ApplicationTests extends GenericApplicationTests<String, Long>
           final StringSerializer stringSerializer = new StringSerializer();
           final LongSerializer longSerializer = new LongSerializer();
 
+          int numberOfMessages;
+          int numberOfLogicErrors;
+          int numberOfPoisonPills;
+
 
           @Override
-          public int generate(
+          public void generate(
               boolean poisonPills,
               boolean logicErrors,
               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
           {
-            int i = 0;
-
             for (int partition = 0; partition < 10; partition++)
             {
               for (int key = 0; key < 10000; key++)
               {
-                i++;
+                numberOfMessages++;
 
-                Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
-                if (i == 99977)
+                Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long) numberOfMessages));
+                if (numberOfMessages == 99977)
                 {
                   if (logicErrors)
                   {
                     value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
+                    numberOfLogicErrors++;
                   }
                   if (poisonPills)
                   {
                     value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
+                    numberOfPoisonPills++;
                   }
                 }
 
                 ProducerRecord<Bytes, Bytes> record =
-                    new ProducerRecord<>(
-                        TOPIC,
-                        partition,
-                        new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
-                        value);
+                  new ProducerRecord<>(
+                    TOPIC,
+                    partition,
+                    new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(partition * 10 + key % 2))),
+                    value);
 
                 messageSender.accept(record);
               }
             }
+          }
+
+          @Override
+          public int getNumberOfMessages()
+          {
+            return numberOfMessages;
+          }
 
-            return i;
+          @Override
+          public int getNumberOfPoisonPills()
+          {
+            return numberOfPoisonPills;
+          }
+
+          @Override
+          public int getNumberOfLogicErrors()
+          {
+            return numberOfLogicErrors;
           }
         });
   }
index 4883f75..5019466 100644 (file)
@@ -78,8 +78,9 @@ abstract class GenericApplicationTests<K, V>
        @Test
        void commitsCurrentOffsetsOnSuccess() throws Exception
        {
-               int numberOfGeneratedMessages =
-                               recordGenerator.generate(false, false, messageSender);
+               recordGenerator.generate(false, false, messageSender);
+
+               int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
 
                await(numberOfGeneratedMessages + " records received")
                                .atMost(Duration.ofSeconds(30))
@@ -107,8 +108,9 @@ abstract class GenericApplicationTests<K, V>
        @SkipWhenErrorCannotBeGenerated(poisonPill = true)
        void commitsOffsetOfErrorForReprocessingOnDeserializationError()
        {
-               int numberOfGeneratedMessages =
-                               recordGenerator.generate(true, false, messageSender);
+               recordGenerator.generate(true, false, messageSender);
+
+               int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
 
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
@@ -144,8 +146,9 @@ abstract class GenericApplicationTests<K, V>
        @SkipWhenErrorCannotBeGenerated(logicError = true)
        void doesNotCommitOffsetsOnLogicError()
        {
-               int numberOfGeneratedMessages =
-                               recordGenerator.generate(false, true, messageSender);
+               recordGenerator.generate(false, true, messageSender);
+
+               int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
 
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
@@ -263,11 +266,15 @@ abstract class GenericApplicationTests<K, V>
 
        public interface RecordGenerator
        {
-               int generate(
+               void generate(
                                boolean poisonPills,
                                boolean logicErrors,
                                Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
 
+               int getNumberOfMessages();
+               int getNumberOfPoisonPills();
+               int getNumberOfLogicErrors();
+
                default boolean canGeneratePoisonPill()
                {
                        return true;