Tests aus gemerged springified-consumer--serialization -> deserialization
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTest.java
index 68f150f..a6d6aa1 100644 (file)
@@ -49,13 +49,14 @@ abstract class GenericApplicationTest<K, V>
        @Autowired
        KafkaConsumer<K, V> kafkaConsumer;
        @Autowired
+       Consumer<ConsumerRecord<K, V>> consumer;
+       @Autowired
        ApplicationProperties properties;
        @Autowired
        ExecutorService executor;
 
        KafkaProducer<Bytes, Bytes> testRecordProducer;
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
-       Consumer<ConsumerRecord<K, V>> testHandler;
        EndlessConsumer<K, V> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
@@ -77,7 +78,7 @@ abstract class GenericApplicationTest<K, V>
        @Test
        void commitsCurrentOffsetsOnSuccess()
        {
-               recordGenerator.generate(100, Set.of(), messageSender);
+               recordGenerator.generate(100, Set.of(), Set.of(), messageSender);
 
                await("100 records received")
                                .atMost(Duration.ofSeconds(30))
@@ -101,7 +102,7 @@ abstract class GenericApplicationTest<K, V>
        @Test
        void commitsOffsetOfErrorForReprocessingOnDeserializationError()
        {
-               recordGenerator.generate(100, Set.of(77), messageSender);
+               recordGenerator.generate(100, Set.of(77), Set.of(), messageSender);
 
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
@@ -131,6 +132,39 @@ abstract class GenericApplicationTest<K, V>
                                .containsInstanceOf(RecordDeserializationException.class);
        }
 
+       @Test
+       void doesNotCommitOffsetsOnLogicError()
+       {
+               recordGenerator.generate(100, Set.of(), Set.of(77), messageSender);
+
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(oldOffsets);
+
+               endlessConsumer.start();
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(oldOffsets);
+               assertThat(receivedRecords.size())
+                               .describedAs("Received not all sent events")
+                               .isLessThan(100);
+
+               assertThatNoException()
+                               .describedAs("Consumer should not be running")
+                               .isThrownBy(() -> endlessConsumer.exitStatus());
+               assertThat(endlessConsumer.exitStatus())
+                               .describedAs("Consumer should have exited abnormally")
+                               .containsInstanceOf(RuntimeException.class);
+       }
+
 
        /** Helper methods for the verification of expectations */
 
@@ -204,7 +238,8 @@ abstract class GenericApplicationTest<K, V>
        {
                void generate(
                                int numberOfMessagesToGenerate,
-                               Set<Integer> poistionPills,
+                               Set<Integer> poisonPills,
+                               Set<Integer> logicErrors,
                                Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
        }
 
@@ -252,8 +287,6 @@ abstract class GenericApplicationTest<K, V>
                props.put("value.deserializer", BytesDeserializer.class.getName());
                offsetConsumer = new KafkaConsumer<>(props);
 
-               testHandler = record -> {} ;
-
                seekToEnd();
 
                oldOffsets = new HashMap<>();
@@ -273,7 +306,7 @@ abstract class GenericApplicationTest<K, V>
                                                        new TopicPartition(record.topic(), record.partition()),
                                                        record.offset());
                                        receivedRecords.add(record);
-                                       testHandler.accept(record);
+                                       consumer.accept(record);
                                };
 
                endlessConsumer =
@@ -305,5 +338,7 @@ abstract class GenericApplicationTest<K, V>
 
        @TestConfiguration
        @Import(ApplicationConfiguration.class)
-       public static class Configuration {}
+       public static class Configuration
+       {
+       }
 }