ROT: Korrigierten/Verbesserten Test und Überarbeitetes Setup gemerged
authorKai Moritz <kai@juplo.de>
Wed, 17 Aug 2022 20:51:10 +0000 (22:51 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 19 Aug 2022 09:49:19 +0000 (11:49 +0200)
* Merge branch 'sumup-adder--ohne--stored-offsets' into sumup-adder.
* In dem gemergten Branch ist es nicht wichtig, wann genau die
  Mongo-DB zwischen den Tests zurückgesetzt wird, da sie nur den Zustand
  des Consumers enthält.
* Wenn die Offsets mit in der Mongo-DB gespeichert werden, ist es
  wesentlich, an zu welchem Zeitpunkt während der Test-Vorbereitung
  diese zurückgesetzt wird!
* ROT: Der verbesserte/verschärfte Test deckt Fehler in der Test-Logik auf.

1  2 
src/test/java/de/juplo/kafka/GenericApplicationTests.java

@@@ -1,5 -1,6 +1,6 @@@
  package de.juplo.kafka;
  
+ import com.mongodb.client.MongoClient;
  import lombok.extern.slf4j.Slf4j;
  import org.apache.kafka.clients.consumer.ConsumerRecord;
  import org.apache.kafka.clients.consumer.KafkaConsumer;
@@@ -12,6 -13,7 +13,7 @@@ import org.apache.kafka.common.utils.By
  import org.junit.jupiter.api.*;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+ import org.springframework.boot.autoconfigure.mongo.MongoProperties;
  import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
  import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
  import org.springframework.boot.test.context.TestConfiguration;
@@@ -39,7 -41,7 +41,7 @@@ import static org.awaitility.Awaitility
                properties = {
                                "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
                                "sumup.adder.topic=" + TOPIC,
-                               "sumup.adder.commit-interval=1s",
+                               "sumup.adder.commit-interval=500ms",
                                "spring.mongodb.embedded.version=4.4.13" })
  @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
  @EnableAutoConfiguration
@@@ -60,8 -62,10 +62,12 @@@ abstract class GenericApplicationTests<
        @Autowired
        ExecutorService executor;
        @Autowired
 +      StateRepository stateRepository;
 +      @Autowired
+       MongoClient mongoClient;
+       @Autowired
+       MongoProperties mongoProperties;
+       @Autowired
        PollIntervalAwareConsumerRebalanceListener rebalanceListener;
        @Autowired
        RecordHandler<K, V> recordHandler;
  
                checkSeenOffsetsForProgress();
                compareToCommitedOffsets(oldOffsets);
-               assertThat(receivedRecords.size())
-                               .describedAs("Received not all sent events")
-                               .isLessThan(numberOfGeneratedMessages);
  
                assertThatNoException()
                                .describedAs("Consumer should not be running")
        void seekToEnd()
        {
                offsetConsumer.assign(partitions());
 -              offsetConsumer.seekToEnd(partitions());
                partitions().forEach(tp ->
                {
 -                      // seekToEnd() works lazily: it only takes effect on poll()/position()
                        Long offset = offsetConsumer.position(tp);
                        log.info("New position for {}: {}", tp, offset);
 +                      Integer partition = tp.partition();
 +                      StateDocument document =
 +                                      stateRepository
 +                                                      .findById(partition.toString())
 +                                                      .orElse(new StateDocument(partition));
 +                      document.offset = offset;
 +                      stateRepository.save(document);
                });
 -              // The new positions must be commited!
 -              offsetConsumer.commitSync();
                offsetConsumer.unsubscribe();
        }
  
        void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
        {
 -              offsetConsumer.assign(partitions());
 -              partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
 -              offsetConsumer.unsubscribe();
 +              partitions().forEach(tp ->
 +              {
 +                      String partition = Integer.toString(tp.partition());
 +                      Optional<Long> offset = stateRepository.findById(partition).map(document -> document.offset);
 +                      consumer.accept(tp, offset.orElse(0l));
 +              });
        }
  
        List<TopicPartition> partitions()
                props.put("value.deserializer", BytesDeserializer.class.getName());
                offsetConsumer = new KafkaConsumer<>(props);
  
++              mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
                seekToEnd();
  
                oldOffsets = new HashMap<>();
                                        }
                                };
  
 -              mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
 -
                endlessConsumer =
                                new EndlessConsumer<>(
                                                executor,