X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=b019373250d2c5c15062d43c4b262d5d7486f5e8;hb=d2eb370acf1a2195c36421ffc471f67cb4a8e86e;hp=9a6f8121382626bd21a41082289e8cd46c5ed4f3;hpb=ecadcd52ab7fe117fef3450a27b9d6ecdf621716;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 9a6f812..b019373 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -12,6 +13,7 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; @@ -39,7 +41,7 @@ import static org.awaitility.Awaitility.*; properties = { "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, - "sumup.adder.commit-interval=1s", + "sumup.adder.commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration @@ -62,6 +64,10 @@ abstract class GenericApplicationTests @Autowired StateRepository stateRepository; @Autowired + MongoClient mongoClient; + @Autowired + MongoProperties mongoProperties; + @Autowired PollIntervalAwareConsumerRebalanceListener rebalanceListener; @Autowired RecordHandler recordHandler; @@ -173,9 +179,6 @@ abstract class GenericApplicationTests checkSeenOffsetsForProgress(); compareToCommitedOffsets(oldOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); assertThatNoException() .describedAs("Consumer should not be running") @@ -329,6 +332,7 @@ abstract class GenericApplicationTests props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); + mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); seekToEnd(); oldOffsets = new HashMap<>();