X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=e16aea7c3c21d26b1ae14c1c8b3ba67a5bcd468c;hb=refs%2Ftags%2Fwip-merge-deserialization--sumup-adder--ohne-stored-offsets;hp=4883f757a5a659da5281bb16709c65127e367b56;hpb=66f52b33cb2380a7449ba6135f1b94a7bbe4d57d;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 4883f75..e16aea7 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -11,6 +12,9 @@ import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.mongo.MongoProperties; +import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; @@ -35,10 +39,13 @@ import static org.awaitility.Awaitility.*; @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) @TestPropertySource( properties = { - "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC, - "consumer.commit-interval=500ms" }) + "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", + "sumup.adder.topic=" + TOPIC, + "sumup.adder.commit-interval=500ms", + "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@EnableAutoConfiguration +@AutoConfigureDataMongo @Slf4j abstract class GenericApplicationTests { @@ -53,11 +60,16 @@ abstract class GenericApplicationTests @Autowired ApplicationProperties applicationProperties; @Autowired + MongoClient mongoClient; + @Autowired + MongoProperties mongoProperties; + @Autowired + RebalanceListener rebalanceListener; + @Autowired TestRecordHandler recordHandler; @Autowired EndlessConsumer endlessConsumer; - KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; Map oldOffsets; @@ -328,6 +340,7 @@ abstract class GenericApplicationTests props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); + mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); seekToEnd(); oldOffsets = new HashMap<>();