DLT auf Basis des `DeadLetterPublishingRecoverer` konfiguriert
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
index 4793d96..b98066f 100644 (file)
@@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.serialization.*;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
@@ -42,6 +41,7 @@ import static org.awaitility.Awaitility.*;
 @TestPropertySource(
                properties = {
                                "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+                               "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
                                "sumup.adder.topic=" + TOPIC,
                                "spring.kafka.consumer.auto-commit-interval=500ms",
                                "spring.mongodb.embedded.version=4.4.13" })
@@ -124,32 +124,29 @@ abstract class GenericApplicationTests<K, V>
        {
                recordGenerator.generate(true, false, messageSender);
 
-               int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
+               int numberOfValidMessages =
+                               recordGenerator.getNumberOfMessages() -
+                               recordGenerator.getNumberOfPoisonPills();
 
-               await("Consumer failed")
+               await(numberOfValidMessages + " records received")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+                               .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
 
-               endlessConsumer.start();
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
+               await("Offsets committed")
+                               .atMost(Duration.ofSeconds(10))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-               assertThat(recordHandler.receivedMessages)
-                               .describedAs("Received not all sent events")
-                               .isLessThan(numberOfGeneratedMessages);
+                               .untilAsserted(() ->
+                               {
+                                       checkSeenOffsetsForProgress();
+                                       assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+                               });
 
                assertThat(endlessConsumer.running())
-                               .describedAs("Consumer should have exited")
-                               .isFalse();
+                               .describedAs("Consumer should still be running")
+                               .isTrue();
 
+               endlessConsumer.stop();
                recordGenerator.assertBusinessLogic();
        }
 
@@ -159,28 +156,29 @@ abstract class GenericApplicationTests<K, V>
        {
                recordGenerator.generate(false, true, messageSender);
 
-               int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
+               int numberOfValidMessages =
+                               recordGenerator.getNumberOfMessages() -
+                               recordGenerator.getNumberOfLogicErrors();
 
-               await("Consumer failed")
+               await(numberOfValidMessages + " records received")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+                               .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
 
-               endlessConsumer.start();
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
+               await("Offsets committed")
+                               .atMost(Duration.ofSeconds(10))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+                               .untilAsserted(() ->
+                               {
+                                       checkSeenOffsetsForProgress();
+                                       assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+                               });
 
                assertThat(endlessConsumer.running())
-                               .describedAs("Consumer should not be running")
-                               .isFalse();
+                               .describedAs("Consumer should still be running")
+                               .isTrue();
 
+               endlessConsumer.stop();
                recordGenerator.assertBusinessLogic();
        }