X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=d65dd8e7af64635b5adb483d2c6d12aa02193871;hb=e54f6036fecc54ea37001dae5f64d88502acfbe1;hp=66a80ad442a433a32d958dcdbbe67931f1107c55;hpb=d8ec8ee7ea93e801051ce3cd6f83db2aa20e6b95;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 66a80ad..d65dd8e 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; @@ -42,6 +41,7 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, "spring.kafka.consumer.auto-commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @@ -92,8 +92,9 @@ abstract class GenericApplicationTests @Test void commitsCurrentOffsetsOnSuccess() throws Exception { - int numberOfGeneratedMessages = - recordGenerator.generate(false, false, messageSender); + recordGenerator.generate(false, false, messageSender); + + int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) @@ -121,33 +122,31 @@ abstract class GenericApplicationTests @SkipWhenErrorCannotBeGenerated(poisonPill = true) void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - int numberOfGeneratedMessages = - recordGenerator.generate(true, false, messageSender); + recordGenerator.generate(true, false, messageSender); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfPoisonPills(); - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThat(recordHandler.receivedMessages) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); assertThat(endlessConsumer.running()) - .describedAs("Consumer should have exited") - .isFalse(); + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -155,29 +154,31 @@ abstract class GenericApplicationTests @SkipWhenErrorCannotBeGenerated(logicError = true) void commitsOffsetsOfUnseenRecordsOnLogicError() { - int numberOfGeneratedMessages = - recordGenerator.generate(false, true, messageSender); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + recordGenerator.generate(false, true, messageSender); - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfLogicErrors(); - endlessConsumer.start(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); assertThat(endlessConsumer.running()) - .describedAs("Consumer should not be running") - .isFalse(); + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -271,11 +272,15 @@ abstract class GenericApplicationTests public interface RecordGenerator { - int generate( + void generate( boolean poisonPills, boolean logicErrors, Consumer> messageSender); + int getNumberOfMessages(); + int getNumberOfPoisonPills(); + int getNumberOfLogicErrors(); + default boolean canGeneratePoisonPill() { return true; @@ -391,5 +396,11 @@ abstract class GenericApplicationTests { return factory.createConsumer(); } + + @Bean + public DeadLetterTopicConsumer deadLetterTopicConsumer() + { + return new DeadLetterTopicConsumer(); + } } }