Tests aus gemerged springified-consumer--serialization -> deserialization
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTest.java
index 6b2b635..a6d6aa1 100644 (file)
@@ -49,28 +49,25 @@ abstract class GenericApplicationTest<K, V>
        @Autowired
        KafkaConsumer<K, V> kafkaConsumer;
        @Autowired
+       Consumer<ConsumerRecord<K, V>> consumer;
+       @Autowired
        ApplicationProperties properties;
        @Autowired
        ExecutorService executor;
 
-       KafkaProducer<K, Bytes> testRecordProducer;
+       KafkaProducer<Bytes, Bytes> testRecordProducer;
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
-       Consumer<ConsumerRecord<K, V>> testHandler;
        EndlessConsumer<K, V> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
        Set<ConsumerRecord<K, V>> receivedRecords;
 
 
-       final Serializer<K> keySerializer;
-       final RecordGenerator<K> recordGenerator;
-       final Consumer<ProducerRecord<K, Bytes>> messageSender;
+       final RecordGenerator recordGenerator;
+       final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
 
-       public GenericApplicationTest(
-                       Serializer<K> keySerializer,
-                       RecordGenerator<K> recordGenerator)
+       public GenericApplicationTest(RecordGenerator recordGenerator)
        {
-               this.keySerializer = keySerializer;
                this.recordGenerator = recordGenerator;
                this.messageSender = (record) -> sendMessage(record);
        }
@@ -81,7 +78,7 @@ abstract class GenericApplicationTest<K, V>
        @Test
        void commitsCurrentOffsetsOnSuccess()
        {
-               recordGenerator.generate(100, Set.of(), messageSender);
+               recordGenerator.generate(100, Set.of(), Set.of(), messageSender);
 
                await("100 records received")
                                .atMost(Duration.ofSeconds(30))
@@ -105,7 +102,7 @@ abstract class GenericApplicationTest<K, V>
        @Test
        void commitsOffsetOfErrorForReprocessingOnDeserializationError()
        {
-               recordGenerator.generate(100, Set.of(77), messageSender);
+               recordGenerator.generate(100, Set.of(77), Set.of(), messageSender);
 
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
@@ -135,6 +132,39 @@ abstract class GenericApplicationTest<K, V>
                                .containsInstanceOf(RecordDeserializationException.class);
        }
 
+       @Test
+       void doesNotCommitOffsetsOnLogicError()
+       {
+               recordGenerator.generate(100, Set.of(), Set.of(77), messageSender);
+
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(oldOffsets);
+
+               endlessConsumer.start();
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(oldOffsets);
+               assertThat(receivedRecords.size())
+                               .describedAs("Received not all sent events")
+                               .isLessThan(100);
+
+               assertThatNoException()
+                               .describedAs("Consumer should not be running")
+                               .isThrownBy(() -> endlessConsumer.exitStatus());
+               assertThat(endlessConsumer.exitStatus())
+                               .describedAs("Consumer should have exited abnormally")
+                               .containsInstanceOf(RuntimeException.class);
+       }
+
 
        /** Helper methods for the verification of expectations */
 
@@ -204,15 +234,16 @@ abstract class GenericApplicationTest<K, V>
        }
 
 
-       public interface RecordGenerator<K>
+       public interface RecordGenerator
        {
                void generate(
                                int numberOfMessagesToGenerate,
-                               Set<Integer> poistionPills,
-                               Consumer<ProducerRecord<K, Bytes>> messageSender);
+                               Set<Integer> poisonPills,
+                               Set<Integer> logicErrors,
+                               Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
        }
 
-       void sendMessage(ProducerRecord<K, Bytes> record)
+       void sendMessage(ProducerRecord<Bytes, Bytes> record)
        {
                testRecordProducer.send(record, (metadata, e) ->
                {
@@ -244,7 +275,7 @@ abstract class GenericApplicationTest<K, V>
                props = new Properties();
                props.put("bootstrap.servers", properties.getBootstrapServer());
                props.put("linger.ms", 100);
-               props.put("key.serializer", keySerializer.getClass().getName());
+               props.put("key.serializer", BytesSerializer.class.getName());
                props.put("value.serializer", BytesSerializer.class.getName());
                testRecordProducer = new KafkaProducer<>(props);
 
@@ -256,8 +287,6 @@ abstract class GenericApplicationTest<K, V>
                props.put("value.deserializer", BytesDeserializer.class.getName());
                offsetConsumer = new KafkaConsumer<>(props);
 
-               testHandler = record -> {} ;
-
                seekToEnd();
 
                oldOffsets = new HashMap<>();
@@ -277,7 +306,7 @@ abstract class GenericApplicationTest<K, V>
                                                        new TopicPartition(record.topic(), record.partition()),
                                                        record.offset());
                                        receivedRecords.add(record);
-                                       testHandler.accept(record);
+                                       consumer.accept(record);
                                };
 
                endlessConsumer =
@@ -309,5 +338,7 @@ abstract class GenericApplicationTest<K, V>
 
        @TestConfiguration
        @Import(ApplicationConfiguration.class)
-       public static class Configuration {}
+       public static class Configuration
+       {
+       }
 }