X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=d7eb0398453a0229e15abacbc6d3c0903be842bd;hb=refs%2Fheads%2Frebalance-listener;hp=7f666f69b5412c242164b97c4c08ec6dae59008e;hpb=cfbe7dcd3318ee846cb5890eea4328e36c5aa364;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 7f666f6..d7eb039 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -12,7 +12,6 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; @@ -41,11 +40,9 @@ import static org.awaitility.Awaitility.*; properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC, - "consumer.commit-interval=1s", - "spring.mongodb.embedded.version=4.4.13" }) + "consumer.commit-interval=1s" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration -@AutoConfigureDataMongo @Slf4j class ApplicationTests { @@ -68,9 +65,9 @@ class ApplicationTests @Autowired ExecutorService executor; @Autowired - KeyCountingRebalanceListener keyCountingRebalanceListener; + ApplicationRebalanceListener rebalanceListener; @Autowired - KeyCountingRecordHandler keyCountingRecordHandler; + ApplicationRecordHandler recordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -273,7 +270,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(keyCountingRecordHandler) { + new TestRecordHandler(recordHandler) { @Override public void onNewRecord(ConsumerRecord record) { @@ -290,7 +287,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, + rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();