Der Test prüft die Anzahl der Einträge im DLT
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
index 4793d96..003a178 100644 (file)
@@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.serialization.*;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
@@ -42,6 +41,7 @@ import static org.awaitility.Awaitility.*;
 @TestPropertySource(
                properties = {
                                "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+                               "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
                                "sumup.adder.topic=" + TOPIC,
                                "spring.kafka.consumer.auto-commit-interval=500ms",
                                "spring.mongodb.embedded.version=4.4.13" })
@@ -70,6 +70,8 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        TestRecordHandler recordHandler;
        @Autowired
+       DeadLetterTopicConsumer deadLetterTopicConsumer;
+       @Autowired
        EndlessConsumer endlessConsumer;
 
        KafkaProducer<Bytes, Bytes> testRecordProducer;
@@ -124,32 +126,33 @@ abstract class GenericApplicationTests<K, V>
        {
                recordGenerator.generate(true, false, messageSender);
 
-               int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
+               int numberOfValidMessages =
+                               recordGenerator.getNumberOfMessages() -
+                               recordGenerator.getNumberOfPoisonPills();
 
-               await("Consumer failed")
+               await(numberOfValidMessages + " records received")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-
-               endlessConsumer.start();
-               await("Consumer failed")
+                               .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
+               await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
+                               .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills());
 
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-               assertThat(recordHandler.receivedMessages)
-                               .describedAs("Received not all sent events")
-                               .isLessThan(numberOfGeneratedMessages);
+               await("Offsets committed")
+                               .atMost(Duration.ofSeconds(10))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .untilAsserted(() ->
+                               {
+                                       checkSeenOffsetsForProgress();
+                                       assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+                               });
 
                assertThat(endlessConsumer.running())
-                               .describedAs("Consumer should have exited")
-                               .isFalse();
+                               .describedAs("Consumer should still be running")
+                               .isTrue();
 
+               endlessConsumer.stop();
                recordGenerator.assertBusinessLogic();
        }
 
@@ -159,28 +162,33 @@ abstract class GenericApplicationTests<K, V>
        {
                recordGenerator.generate(false, true, messageSender);
 
-               int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
+               int numberOfValidMessages =
+                               recordGenerator.getNumberOfMessages() -
+                               recordGenerator.getNumberOfLogicErrors();
 
-               await("Consumer failed")
+               await(numberOfValidMessages + " records received")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-
-               endlessConsumer.start();
-               await("Consumer failed")
+                               .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
+               await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
+                               .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors());
 
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+               await("Offsets committed")
+                               .atMost(Duration.ofSeconds(10))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .untilAsserted(() ->
+                               {
+                                       checkSeenOffsetsForProgress();
+                                       assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+                               });
 
                assertThat(endlessConsumer.running())
-                               .describedAs("Consumer should not be running")
-                               .isFalse();
+                               .describedAs("Consumer should still be running")
+                               .isTrue();
 
+               endlessConsumer.stop();
                recordGenerator.assertBusinessLogic();
        }
 
@@ -350,6 +358,8 @@ abstract class GenericApplicationTests<K, V>
                recordHandler.seenOffsets = new HashMap<>();
                recordHandler.receivedMessages = 0;
 
+               deadLetterTopicConsumer.messages.clear();
+
                doForCurrentOffsets((tp, offset) ->
                {
                        oldOffsets.put(tp, offset - 1);
@@ -398,5 +408,11 @@ abstract class GenericApplicationTests<K, V>
                {
                        return factory.createConsumer();
                }
+
+               @Bean
+               public DeadLetterTopicConsumer deadLetterTopicConsumer()
+               {
+                       return new DeadLetterTopicConsumer();
+               }
        }
 }