X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=ac8a6298fc17edc6f790f2ea58a292ed2231fe91;hb=66ff7d205e66616de8aaca94503dbbcd7d281f6d;hp=4793d960e0530b257f0a5c20313a8179a3156bfc;hpb=7a7926c1799495a3ed016cb1b204cbfe13f833f1;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 4793d96..ac8a629 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,11 +2,11 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; @@ -19,8 +19,10 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -42,6 +44,7 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, "spring.kafka.consumer.auto-commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @@ -70,6 +73,8 @@ abstract class GenericApplicationTests @Autowired TestRecordHandler recordHandler; @Autowired + DeadLetterTopicConsumer deadLetterTopicConsumer; + @Autowired EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; @@ -124,32 +129,33 @@ abstract class GenericApplicationTests { recordGenerator.generate(true, false, messageSender); - int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfPoisonPills(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); + await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills()); - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThat(recordHandler.receivedMessages) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); assertThat(endlessConsumer.running()) - .describedAs("Consumer should have exited") - .isFalse(); + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -159,28 +165,33 @@ abstract class GenericApplicationTests { recordGenerator.generate(false, true, messageSender); - int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfLogicErrors(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); + await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors()); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); assertThat(endlessConsumer.running()) - .describedAs("Consumer should not be running") - .isFalse(); + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -350,6 +361,8 @@ abstract class GenericApplicationTests recordHandler.seenOffsets = new HashMap<>(); recordHandler.receivedMessages = 0; + deadLetterTopicConsumer.messages.clear(); + doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); @@ -398,5 +411,30 @@ abstract class GenericApplicationTests { return factory.createConsumer(); } + + @Bean + public ConcurrentKafkaListenerContainerFactory dltContainerFactory( + KafkaProperties properties) + { + Map consumerProperties = new HashMap<>(); + + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + DefaultKafkaConsumerFactory dltConsumerFactory = + new DefaultKafkaConsumerFactory<>(consumerProperties); + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(dltConsumerFactory); + return factory; + } + + @Bean + public DeadLetterTopicConsumer deadLetterTopicConsumer() + { + return new DeadLetterTopicConsumer(); + } } }