X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=93daf6ba490414763fbc76f460130272096d102d;hb=600b0b10f8e98bebef80f75e391a78c459ffb45c;hp=711a44a537175eec6547a5f315b4581ecdf8d7f6;hpb=c9d7601fc551069cd3a77da06a6333f22101a8a0;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 711a44a..93daf6b 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -12,6 +13,7 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; @@ -39,7 +41,7 @@ import static org.awaitility.Awaitility.*; properties = { "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, - "sumup.adder.commit-interval=1s", + "sumup.adder.commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration @@ -60,7 +62,9 @@ abstract class GenericApplicationTests @Autowired ExecutorService executor; @Autowired - PartitionStatisticsRepository partitionStatisticsRepository; + MongoClient mongoClient; + @Autowired + MongoProperties mongoProperties; @Autowired PollIntervalAwareConsumerRebalanceListener rebalanceListener; @Autowired @@ -173,9 +177,6 @@ abstract class GenericApplicationTests checkSeenOffsetsForProgress(); compareToCommitedOffsets(oldOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); assertThatNoException() .describedAs("Consumer should not be running") @@ -227,29 +228,23 @@ abstract class GenericApplicationTests void seekToEnd() { offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); partitions().forEach(tp -> { + // seekToEnd() works lazily: it only takes effect on poll()/position() Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); - Integer partition = tp.partition(); - StateDocument document = - partitionStatisticsRepository - .findById(partition.toString()) - .orElse(new StateDocument(partition)); - document.offset = offset; - partitionStatisticsRepository.save(document); }); + // The new positions must be commited! + offsetConsumer.commitSync(); offsetConsumer.unsubscribe(); } void doForCurrentOffsets(BiConsumer consumer) { - partitions().forEach(tp -> - { - String partition = Integer.toString(tp.partition()); - Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); - consumer.accept(tp, offset.orElse(0l)); - }); + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); + offsetConsumer.unsubscribe(); } List partitions() @@ -354,6 +349,8 @@ abstract class GenericApplicationTests } }; + mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); + endlessConsumer = new EndlessConsumer<>( executor,