X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=595ef89d29001fbc39a34cf1d757e8ecadc7d69f;hb=2d25525ef70a90709edc48bd9542d1b08a2888a2;hp=0f40058b2e8ed93582e0456e9aa1be9c5b3f5d17;hpb=e11c6152c721440d4a599a6f5fe0fe46f2283f31;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 0f40058..595ef89 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -62,13 +63,11 @@ abstract class GenericApplicationTests @Autowired ExecutorService executor; @Autowired - StateRepository stateRepository; - @Autowired MongoClient mongoClient; @Autowired MongoProperties mongoProperties; @Autowired - PollIntervalAwareConsumerRebalanceListener rebalanceListener; + ConsumerRebalanceListener rebalanceListener; @Autowired RecordHandler recordHandler; @@ -93,7 +92,7 @@ abstract class GenericApplicationTests /** Tests methods */ @Test - void commitsCurrentOffsetsOnSuccess() + void commitsCurrentOffsetsOnSuccess() throws Exception { int numberOfGeneratedMessages = recordGenerator.generate(false, false, messageSender); @@ -116,6 +115,7 @@ abstract class GenericApplicationTests .isThrownBy(() -> endlessConsumer.exitStatus()) .describedAs("Consumer should still be running"); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -248,29 +248,23 @@ abstract class GenericApplicationTests void seekToEnd() { offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); partitions().forEach(tp -> { + // seekToEnd() works lazily: it only takes effect on poll()/position() Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); - Integer partition = tp.partition(); - StateDocument document = - stateRepository - .findById(partition.toString()) - .orElse(new StateDocument(partition)); - document.offset = offset; - stateRepository.save(document); }); + // The new positions must be commited! + offsetConsumer.commitSync(); offsetConsumer.unsubscribe(); } void doForCurrentOffsets(BiConsumer consumer) { - partitions().forEach(tp -> - { - String partition = Integer.toString(tp.partition()); - Optional offset = stateRepository.findById(partition).map(document -> document.offset); - consumer.accept(tp, offset.orElse(0l)); - }); + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); + offsetConsumer.unsubscribe(); } List partitions() @@ -393,7 +387,6 @@ abstract class GenericApplicationTests { try { - endlessConsumer.stop(); testRecordProducer.close(); offsetConsumer.close(); }