package de.juplo.kafka;
+import com.mongodb.client.MongoClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.mongo.MongoProperties;
import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
import org.springframework.boot.test.context.TestConfiguration;
@Autowired
ExecutorService executor;
@Autowired
+ MongoClient mongoClient;
+ @Autowired
+ MongoProperties mongoProperties;
+ @Autowired
PollIntervalAwareConsumerRebalanceListener rebalanceListener;
@Autowired
RecordHandler<K, V> recordHandler;
}
};
+ mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
+
endlessConsumer =
new EndlessConsumer<>(
executor,