X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=d65dd8e7af64635b5adb483d2c6d12aa02193871;hb=e54f6036fecc54ea37001dae5f64d88502acfbe1;hp=753debe2b00344766c2c9a4f90286e0b6e075337;hpb=f095f71a104fcde025a63f87ba75eb5cb3136656;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 753debe..d65dd8e 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,12 +2,10 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; @@ -43,6 +41,7 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, "spring.kafka.consumer.auto-commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @@ -69,9 +68,9 @@ abstract class GenericApplicationTests @Autowired KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Autowired - TestRecordHandler recordHandler; + TestRecordHandler recordHandler; @Autowired - EndlessConsumer endlessConsumer; + EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; @@ -93,13 +92,14 @@ abstract class GenericApplicationTests @Test void commitsCurrentOffsetsOnSuccess() throws Exception { - int numberOfGeneratedMessages = - recordGenerator.generate(false, false, messageSender); + recordGenerator.generate(false, false, messageSender); + + int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages); + .until(() -> recordHandler.receivedMessages >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -110,9 +110,9 @@ abstract class GenericApplicationTests assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); }); - assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> endlessConsumer.exitStatus()) - .describedAs("Consumer should still be running"); + assertThat(endlessConsumer.running()) + .describedAs("Consumer should still be running") + .isTrue(); endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); @@ -122,69 +122,63 @@ abstract class GenericApplicationTests @SkipWhenErrorCannotBeGenerated(poisonPill = true) void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - int numberOfGeneratedMessages = - recordGenerator.generate(true, false, messageSender); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + recordGenerator.generate(true, false, messageSender); - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfPoisonPills(); - endlessConsumer.start(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThat(recordHandler.receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); + assertThat(endlessConsumer.running()) + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @Test @SkipWhenErrorCannotBeGenerated(logicError = true) - void doesNotCommitOffsetsOnLogicError() + void commitsOffsetsOfUnseenRecordsOnLogicError() { - int numberOfGeneratedMessages = - recordGenerator.generate(false, true, messageSender); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + recordGenerator.generate(false, true, messageSender); - checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfLogicErrors(); - endlessConsumer.start(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); - assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RuntimeException.class); + assertThat(endlessConsumer.running()) + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -278,11 +272,15 @@ abstract class GenericApplicationTests public interface RecordGenerator { - int generate( + void generate( boolean poisonPills, boolean logicErrors, Consumer> messageSender); + int getNumberOfMessages(); + int getNumberOfPoisonPills(); + int getNumberOfLogicErrors(); + default boolean canGeneratePoisonPill() { return true; @@ -348,7 +346,7 @@ abstract class GenericApplicationTests oldOffsets = new HashMap<>(); recordHandler.seenOffsets = new HashMap<>(); - recordHandler.receivedRecords = new HashSet<>(); + recordHandler.receivedMessages = 0; doForCurrentOffsets((tp, offset) -> { @@ -393,10 +391,16 @@ abstract class GenericApplicationTests return new TestRecordHandler(applicationRecordHandler); } - @Bean(destroyMethod = "close") - public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) - { - return factory.createConsumer(); - } + @Bean(destroyMethod = "close") + public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) + { + return factory.createConsumer(); + } + + @Bean + public DeadLetterTopicConsumer deadLetterTopicConsumer() + { + return new DeadLetterTopicConsumer(); + } } }