X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=a8fa7ea22b9c21d7c16fad357cc1eff27a130daa;hb=0cb776f6a698fda66b655a769827864359f69cb1;hp=595ef89d29001fbc39a34cf1d757e8ecadc7d69f;hpb=2d25525ef70a90709edc48bd9542d1b08a2888a2;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 595ef89..a8fa7ea 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -63,11 +62,13 @@ abstract class GenericApplicationTests @Autowired ExecutorService executor; @Autowired + StateRepository stateRepository; + @Autowired MongoClient mongoClient; @Autowired MongoProperties mongoProperties; @Autowired - ConsumerRebalanceListener rebalanceListener; + RebalanceListener rebalanceListener; @Autowired RecordHandler recordHandler; @@ -213,7 +214,7 @@ abstract class GenericApplicationTests Long expected = offsetsToCheck.get(tp) + 1; log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") + .describedAs("Committed offset must be at most equal to the offset of the consumer") .isLessThanOrEqualTo(expected); isOffsetBehindSeen.add(offset < expected); }); @@ -248,23 +249,29 @@ abstract class GenericApplicationTests void seekToEnd() { offsetConsumer.assign(partitions()); - offsetConsumer.seekToEnd(partitions()); partitions().forEach(tp -> { - // seekToEnd() works lazily: it only takes effect on poll()/position() Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); + Integer partition = tp.partition(); + StateDocument document = + stateRepository + .findById(partition.toString()) + .orElse(new StateDocument(partition)); + document.offset = offset; + stateRepository.save(document); }); - // The new positions must be commited! - offsetConsumer.commitSync(); offsetConsumer.unsubscribe(); } void doForCurrentOffsets(BiConsumer consumer) { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); + partitions().forEach(tp -> + { + String partition = Integer.toString(tp.partition()); + Optional offset = stateRepository.findById(partition).map(document -> document.offset); + consumer.accept(tp, offset.orElse(0l)); + }); } List partitions()