Tests aus gemerged springified-consumer--serialization -> deserialization
authorKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 10:06:01 +0000 (12:06 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 10:55:12 +0000 (12:55 +0200)
* Es wurde nur der hinzugefügte Test übernommen.
* Der hinzugefügte Test wurde an das von Spring-Kafka abweichende
  Verhalten bei einem Logik-Fehler angepasst: Kafka führt nicht automatisch
  Seeks oder einene Commit durch. Da `EndlessConsumer` bei einem
  Logik-Fehler explizit ein `unsubscribe()` durchführt, wird kein
  Offset-Commit durchgefürt, so dass die alten Offset-Positionen gültig
  bleiben.
* Der Test wurde entsprechend umbenannt.
* `RecordGenerator` wurde um einen weiteren Integer-Set erweitert, über
  den die Indizes der zu erzeugenden Logik-Fehler gesetzt werden können.
* Der hinzugefügte Test wurde auf die überarbeitete Methode zur Erzeugung
  der Test-Nachrichten umgestellt.
* `ApplicationTest` wurde so ergänzt, dass der für den hinzugefügten Test
  benötigte Logik-Fehler erzeugt wird.

src/test/java/de/juplo/kafka/ApplicationTest.java
src/test/java/de/juplo/kafka/GenericApplicationTest.java

index 81165ab..ed93a21 100644 (file)
@@ -1,14 +1,20 @@
 package de.juplo.kafka;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.test.context.ContextConfiguration;
 
 import java.util.Set;
 import java.util.function.Consumer;
 
 
+@ContextConfiguration(classes = ApplicationTest.Configuration.class)
 public class ApplicationTest extends GenericApplicationTest<String, Long>
 {
   public ApplicationTest()
@@ -24,6 +30,7 @@ public class ApplicationTest extends GenericApplicationTest<String, Long>
           public void generate(
               int numberOfMessagesToGenerate,
               Set<Integer> poisonPills,
+              Set<Integer> logicErrors,
               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
           {
             int i = 0;
@@ -35,10 +42,15 @@ public class ApplicationTest extends GenericApplicationTest<String, Long>
                 if (++i > numberOfMessagesToGenerate)
                   return;
 
-                Bytes value =
-                    poisonPills.contains(i)
-                        ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
-                        : new Bytes(longSerializer.serialize(TOPIC, (long)i));
+                Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
+                if (logicErrors.contains(i))
+                {
+                  value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
+                }
+                if (poisonPills.contains(i))
+                {
+                  value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
+                }
 
                 ProducerRecord<Bytes, Bytes> record =
                     new ProducerRecord<>(
@@ -53,4 +65,20 @@ public class ApplicationTest extends GenericApplicationTest<String, Long>
           }
         });
   }
+
+
+  @TestConfiguration
+  public static class Configuration
+  {
+    @Primary
+    @Bean
+    public Consumer<ConsumerRecord<String, Long>> consumer()
+    {
+      return (record) ->
+      {
+        if (record.value() == Long.MIN_VALUE)
+          throw new RuntimeException("BOOM (Logic-Error)!");
+      };
+    }
+  }
 }
index 68f150f..a6d6aa1 100644 (file)
@@ -49,13 +49,14 @@ abstract class GenericApplicationTest<K, V>
        @Autowired
        KafkaConsumer<K, V> kafkaConsumer;
        @Autowired
+       Consumer<ConsumerRecord<K, V>> consumer;
+       @Autowired
        ApplicationProperties properties;
        @Autowired
        ExecutorService executor;
 
        KafkaProducer<Bytes, Bytes> testRecordProducer;
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
-       Consumer<ConsumerRecord<K, V>> testHandler;
        EndlessConsumer<K, V> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
@@ -77,7 +78,7 @@ abstract class GenericApplicationTest<K, V>
        @Test
        void commitsCurrentOffsetsOnSuccess()
        {
-               recordGenerator.generate(100, Set.of(), messageSender);
+               recordGenerator.generate(100, Set.of(), Set.of(), messageSender);
 
                await("100 records received")
                                .atMost(Duration.ofSeconds(30))
@@ -101,7 +102,7 @@ abstract class GenericApplicationTest<K, V>
        @Test
        void commitsOffsetOfErrorForReprocessingOnDeserializationError()
        {
-               recordGenerator.generate(100, Set.of(77), messageSender);
+               recordGenerator.generate(100, Set.of(77), Set.of(), messageSender);
 
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
@@ -131,6 +132,39 @@ abstract class GenericApplicationTest<K, V>
                                .containsInstanceOf(RecordDeserializationException.class);
        }
 
+       @Test
+       void doesNotCommitOffsetsOnLogicError()
+       {
+               recordGenerator.generate(100, Set.of(), Set.of(77), messageSender);
+
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(oldOffsets);
+
+               endlessConsumer.start();
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(oldOffsets);
+               assertThat(receivedRecords.size())
+                               .describedAs("Received not all sent events")
+                               .isLessThan(100);
+
+               assertThatNoException()
+                               .describedAs("Consumer should not be running")
+                               .isThrownBy(() -> endlessConsumer.exitStatus());
+               assertThat(endlessConsumer.exitStatus())
+                               .describedAs("Consumer should have exited abnormally")
+                               .containsInstanceOf(RuntimeException.class);
+       }
+
 
        /** Helper methods for the verification of expectations */
 
@@ -204,7 +238,8 @@ abstract class GenericApplicationTest<K, V>
        {
                void generate(
                                int numberOfMessagesToGenerate,
-                               Set<Integer> poistionPills,
+                               Set<Integer> poisonPills,
+                               Set<Integer> logicErrors,
                                Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
        }
 
@@ -252,8 +287,6 @@ abstract class GenericApplicationTest<K, V>
                props.put("value.deserializer", BytesDeserializer.class.getName());
                offsetConsumer = new KafkaConsumer<>(props);
 
-               testHandler = record -> {} ;
-
                seekToEnd();
 
                oldOffsets = new HashMap<>();
@@ -273,7 +306,7 @@ abstract class GenericApplicationTest<K, V>
                                                        new TopicPartition(record.topic(), record.partition()),
                                                        record.offset());
                                        receivedRecords.add(record);
-                                       testHandler.accept(record);
+                                       consumer.accept(record);
                                };
 
                endlessConsumer =
@@ -305,5 +338,7 @@ abstract class GenericApplicationTest<K, V>
 
        @TestConfiguration
        @Import(ApplicationConfiguration.class)
-       public static class Configuration {}
+       public static class Configuration
+       {
+       }
 }