From: Kai Moritz Date: Wed, 17 Aug 2022 20:51:10 +0000 (+0200) Subject: ROT: Korrigierten/Verbesserten Test und Überarbeitetes Setup gemerged X-Git-Tag: sumup-adder---lvm-2-tage~10 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=d2eb370acf1a2195c36421ffc471f67cb4a8e86e;hp=-c;p=demos%2Fkafka%2Ftraining ROT: Korrigierten/Verbesserten Test und Überarbeitetes Setup gemerged * Merge branch 'sumup-adder--ohne--stored-offsets' into sumup-adder. * In dem gemergten Branch ist es nicht wichtig, wann genau die Mongo-DB zwischen den Tests zurückgesetzt wird, da sie nur den Zustand des Consumers enthält. * Wenn die Offsets mit in der Mongo-DB gespeichert werden, ist es wesentlich, an zu welchem Zeitpunkt während der Test-Vorbereitung diese zurückgesetzt wird! * ROT: Der verbesserte/verschärfte Test deckt Fehler in der Test-Logik auf. --- d2eb370acf1a2195c36421ffc471f67cb4a8e86e diff --combined src/test/java/de/juplo/kafka/GenericApplicationTests.java index 9a6f812,93daf6b..b019373 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@@ -1,5 -1,6 +1,6 @@@ package de.juplo.kafka; + import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; @@@ -12,6 -13,7 +13,7 @@@ import org.apache.kafka.common.utils.By import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; + import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; @@@ -39,7 -41,7 +41,7 @@@ import static org.awaitility.Awaitility properties = { "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, - "sumup.adder.commit-interval=1s", + "sumup.adder.commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration @@@ -60,8 -62,10 +62,12 @@@ abstract class GenericApplicationTests< @Autowired ExecutorService executor; @Autowired + StateRepository stateRepository; + @Autowired + MongoClient mongoClient; + @Autowired + MongoProperties mongoProperties; + @Autowired PollIntervalAwareConsumerRebalanceListener rebalanceListener; @Autowired RecordHandler recordHandler; @@@ -173,9 -177,6 +179,6 @@@ checkSeenOffsetsForProgress(); compareToCommitedOffsets(oldOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); assertThatNoException() .describedAs("Consumer should not be running") @@@ -227,29 -228,23 +230,29 @@@ void seekToEnd() { offsetConsumer.assign(partitions()); - offsetConsumer.seekToEnd(partitions()); partitions().forEach(tp -> { - // seekToEnd() works lazily: it only takes effect on poll()/position() Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); + Integer partition = tp.partition(); + StateDocument document = + stateRepository + .findById(partition.toString()) + .orElse(new StateDocument(partition)); + document.offset = offset; + stateRepository.save(document); }); - // The new positions must be commited! - offsetConsumer.commitSync(); offsetConsumer.unsubscribe(); } void doForCurrentOffsets(BiConsumer consumer) { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); + partitions().forEach(tp -> + { + String partition = Integer.toString(tp.partition()); + Optional offset = stateRepository.findById(partition).map(document -> document.offset); + consumer.accept(tp, offset.orElse(0l)); + }); } List partitions() @@@ -329,6 -324,6 +332,7 @@@ props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); ++ mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); seekToEnd(); oldOffsets = new HashMap<>(); @@@ -354,6 -349,8 +358,6 @@@ } }; - mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); - endlessConsumer = new EndlessConsumer<>( executor,