X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=ac8a6298fc17edc6f790f2ea58a292ed2231fe91;hb=66ff7d205e66616de8aaca94503dbbcd7d281f6d;hp=b98066fcb1c6bbea2aeabd36a95547f7c8d7f7e6;hpb=ac154bb18a6c575fe01e70cba6a86d10580dfb89;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index b98066f..ac8a629 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -18,8 +19,10 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -70,6 +73,8 @@ abstract class GenericApplicationTests @Autowired TestRecordHandler recordHandler; @Autowired + DeadLetterTopicConsumer deadLetterTopicConsumer; + @Autowired EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; @@ -132,6 +137,10 @@ abstract class GenericApplicationTests .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); + await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills()); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -164,6 +173,10 @@ abstract class GenericApplicationTests .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); + await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors()); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -348,6 +361,8 @@ abstract class GenericApplicationTests recordHandler.seenOffsets = new HashMap<>(); recordHandler.receivedMessages = 0; + deadLetterTopicConsumer.messages.clear(); + doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); @@ -396,5 +411,30 @@ abstract class GenericApplicationTests { return factory.createConsumer(); } + + @Bean + public ConcurrentKafkaListenerContainerFactory dltContainerFactory( + KafkaProperties properties) + { + Map consumerProperties = new HashMap<>(); + + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + DefaultKafkaConsumerFactory dltConsumerFactory = + new DefaultKafkaConsumerFactory<>(consumerProperties); + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(dltConsumerFactory); + return factory; + } + + @Bean + public DeadLetterTopicConsumer deadLetterTopicConsumer() + { + return new DeadLetterTopicConsumer(); + } } }