X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=ac8a6298fc17edc6f790f2ea58a292ed2231fe91;hb=66ff7d205e66616de8aaca94503dbbcd7d281f6d;hp=21c3f7f01c524eb64ad88d204b9b7a82e4a65e22;hpb=7adff476ad862d30d668d75212d1ca1c7cf16b03;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 21c3f7f..ac8a629 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,32 +2,33 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; import java.util.*; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -39,14 +40,11 @@ import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; -@SpringJUnitConfig( - initializers = ConfigDataApplicationContextInitializer.class, - classes = { - KafkaAutoConfiguration.class, - ApplicationTests.Configuration.class }) +@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, "spring.kafka.consumer.auto-commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @@ -63,28 +61,25 @@ abstract class GenericApplicationTests @Autowired org.apache.kafka.clients.consumer.Consumer kafkaConsumer; @Autowired - Consumer> consumer; - @Autowired - ApplicationProperties applicationProperties; - @Autowired KafkaProperties kafkaProperties; @Autowired - ExecutorService executor; + ApplicationProperties applicationProperties; @Autowired MongoClient mongoClient; @Autowired MongoProperties mongoProperties; @Autowired - ConsumerRebalanceListener rebalanceListener; + KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Autowired - RecordHandler recordHandler; + TestRecordHandler recordHandler; + @Autowired + DeadLetterTopicConsumer deadLetterTopicConsumer; + @Autowired + EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; - EndlessConsumer endlessConsumer; Map oldOffsets; - Map seenOffsets; - Set> receivedRecords; final RecordGenerator recordGenerator; @@ -102,13 +97,14 @@ abstract class GenericApplicationTests @Test void commitsCurrentOffsetsOnSuccess() throws Exception { - int numberOfGeneratedMessages = - recordGenerator.generate(false, false, messageSender); + recordGenerator.generate(false, false, messageSender); + + int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); + .until(() -> recordHandler.receivedMessages >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -116,12 +112,12 @@ abstract class GenericApplicationTests .untilAsserted(() -> { checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); }); - assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> endlessConsumer.exitStatus()) - .describedAs("Consumer should still be running"); + assertThat(endlessConsumer.running()) + .describedAs("Consumer should still be running") + .isTrue(); endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); @@ -131,69 +127,71 @@ abstract class GenericApplicationTests @SkipWhenErrorCannotBeGenerated(poisonPill = true) void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - int numberOfGeneratedMessages = - recordGenerator.generate(true, false, messageSender); + recordGenerator.generate(true, false, messageSender); + + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfPoisonPills(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); + await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills()); - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); + assertThat(endlessConsumer.running()) + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @Test @SkipWhenErrorCannotBeGenerated(logicError = true) - void doesNotCommitOffsetsOnLogicError() + void commitsOffsetsOfUnseenRecordsOnLogicError() { - int numberOfGeneratedMessages = - recordGenerator.generate(false, true, messageSender); + recordGenerator.generate(false, true, messageSender); - await("Consumer failed") + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfLogicErrors(); + + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); + await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors()); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RuntimeException.class); + assertThat(endlessConsumer.running()) + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -238,7 +236,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = seenOffsets.get(tp) + 1; + Long newOffset = recordHandler.seenOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -287,11 +285,15 @@ abstract class GenericApplicationTests public interface RecordGenerator { - int generate( + void generate( boolean poisonPills, boolean logicErrors, Consumer> messageSender); + int getNumberOfMessages(); + int getNumberOfPoisonPills(); + int getNumberOfLogicErrors(); + default boolean canGeneratePoisonPill() { return true; @@ -356,43 +358,32 @@ abstract class GenericApplicationTests seekToEnd(); oldOffsets = new HashMap<>(); - seenOffsets = new HashMap<>(); - receivedRecords = new HashSet<>(); + recordHandler.seenOffsets = new HashMap<>(); + recordHandler.receivedMessages = 0; + + deadLetterTopicConsumer.messages.clear(); doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); - seenOffsets.put(tp, offset - 1); + recordHandler.seenOffsets.put(tp, offset - 1); }); - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(recordHandler) - { - @Override - public void onNewRecord(ConsumerRecord record) - { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - } - }; - - endlessConsumer = - new EndlessConsumer<>( - executor, - kafkaProperties.getClientId(), - applicationProperties.getTopic(), - kafkaConsumer, - rebalanceListener, - captureOffsetAndExecuteTestHandler); - endlessConsumer.start(); } @AfterEach public void deinit() { + try + { + endlessConsumer.stop(); + } + catch (Exception e) + { + log.debug("{}", e.toString()); + } + try { testRecordProducer.close(); @@ -409,5 +400,41 @@ abstract class GenericApplicationTests @Import(ApplicationConfiguration.class) public static class Configuration { + @Bean + public RecordHandler recordHandler(RecordHandler applicationRecordHandler) + { + return new TestRecordHandler(applicationRecordHandler); + } + + @Bean(destroyMethod = "close") + public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) + { + return factory.createConsumer(); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory dltContainerFactory( + KafkaProperties properties) + { + Map consumerProperties = new HashMap<>(); + + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + DefaultKafkaConsumerFactory dltConsumerFactory = + new DefaultKafkaConsumerFactory<>(consumerProperties); + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(dltConsumerFactory); + return factory; + } + + @Bean + public DeadLetterTopicConsumer deadLetterTopicConsumer() + { + return new DeadLetterTopicConsumer(); + } } }