X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=49ddb47e1f2db54dc9cf3e570c54a14cbfe5591a;hb=0c9a0c1cf9a0065012743efcd940d8721bc33c20;hp=0f40058b2e8ed93582e0456e9aa1be9c5b3f5d17;hpb=e11c6152c721440d4a599a6f5fe0fe46f2283f31;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 0f40058..49ddb47 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -13,18 +12,21 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; import java.util.*; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -39,9 +41,9 @@ import static org.awaitility.Awaitility.*; @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) @TestPropertySource( properties = { - "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, - "sumup.adder.commit-interval=500ms", + "spring.kafka.consumer.auto-commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration @@ -54,30 +56,25 @@ abstract class GenericApplicationTests @Autowired - KafkaConsumer kafkaConsumer; + org.apache.kafka.clients.consumer.Consumer kafkaConsumer; @Autowired - Consumer> consumer; + KafkaProperties kafkaProperties; @Autowired - ApplicationProperties properties; - @Autowired - ExecutorService executor; - @Autowired - StateRepository stateRepository; + ApplicationProperties applicationProperties; @Autowired MongoClient mongoClient; @Autowired MongoProperties mongoProperties; @Autowired - PollIntervalAwareConsumerRebalanceListener rebalanceListener; + KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + @Autowired + TestRecordHandler recordHandler; @Autowired - RecordHandler recordHandler; + EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; - EndlessConsumer endlessConsumer; Map oldOffsets; - Map seenOffsets; - Set> receivedRecords; final RecordGenerator recordGenerator; @@ -93,7 +90,7 @@ abstract class GenericApplicationTests /** Tests methods */ @Test - void commitsCurrentOffsetsOnSuccess() + void commitsCurrentOffsetsOnSuccess() throws Exception { int numberOfGeneratedMessages = recordGenerator.generate(false, false, messageSender); @@ -101,7 +98,7 @@ abstract class GenericApplicationTests await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); + .until(() -> recordHandler.receivedMessages >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -109,13 +106,14 @@ abstract class GenericApplicationTests .untilAsserted(() -> { checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); }); assertThatExceptionOfType(IllegalStateException.class) .isThrownBy(() -> endlessConsumer.exitStatus()) .describedAs("Consumer should still be running"); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -132,7 +130,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -141,8 +139,8 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - assertThat(receivedRecords.size()) + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + assertThat(recordHandler.receivedMessages) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -158,7 +156,7 @@ abstract class GenericApplicationTests @Test @SkipWhenErrorCannotBeGenerated(logicError = true) - void doesNotCommitOffsetsOnLogicError() + void commitsOffsetsOfUnseenRecordsOnLogicError() { int numberOfGeneratedMessages = recordGenerator.generate(false, true, messageSender); @@ -169,7 +167,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -177,7 +175,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") @@ -213,7 +211,7 @@ abstract class GenericApplicationTests Long expected = offsetsToCheck.get(tp) + 1; log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") + .describedAs("Committed offset must be at most equal to the offset of the consumer") .isLessThanOrEqualTo(expected); isOffsetBehindSeen.add(offset < expected); }); @@ -230,7 +228,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = seenOffsets.get(tp) + 1; + Long newOffset = recordHandler.seenOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -248,29 +246,23 @@ abstract class GenericApplicationTests void seekToEnd() { offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); partitions().forEach(tp -> { + // seekToEnd() works lazily: it only takes effect on poll()/position() Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); - Integer partition = tp.partition(); - StateDocument document = - stateRepository - .findById(partition.toString()) - .orElse(new StateDocument(partition)); - document.offset = offset; - stateRepository.save(document); }); + // The new positions must be commited! + offsetConsumer.commitSync(); offsetConsumer.unsubscribe(); } void doForCurrentOffsets(BiConsumer consumer) { - partitions().forEach(tp -> - { - String partition = Integer.toString(tp.partition()); - Optional offset = stateRepository.findById(partition).map(document -> document.offset); - consumer.accept(tp, offset.orElse(0l)); - }); + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); + offsetConsumer.unsubscribe(); } List partitions() @@ -336,16 +328,16 @@ abstract class GenericApplicationTests { Properties props; props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); props.put("linger.ms", 100); props.put("key.serializer", BytesSerializer.class.getName()); props.put("value.serializer", BytesSerializer.class.getName()); testRecordProducer = new KafkaProducer<>(props); props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); + props.put("group.id", kafkaProperties.getConsumer().getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); @@ -354,37 +346,15 @@ abstract class GenericApplicationTests seekToEnd(); oldOffsets = new HashMap<>(); - seenOffsets = new HashMap<>(); - receivedRecords = new HashSet<>(); + recordHandler.seenOffsets = new HashMap<>(); + recordHandler.receivedMessages = 0; doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); - seenOffsets.put(tp, offset - 1); + recordHandler.seenOffsets.put(tp, offset - 1); }); - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(recordHandler) - { - @Override - public void onNewRecord(ConsumerRecord record) - { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - } - }; - - endlessConsumer = - new EndlessConsumer<>( - executor, - properties.getClientId(), - properties.getTopic(), - kafkaConsumer, - rebalanceListener, - captureOffsetAndExecuteTestHandler); - endlessConsumer.start(); } @@ -394,6 +364,14 @@ abstract class GenericApplicationTests try { endlessConsumer.stop(); + } + catch (Exception e) + { + log.debug("{}", e.toString()); + } + + try + { testRecordProducer.close(); offsetConsumer.close(); } @@ -408,5 +386,16 @@ abstract class GenericApplicationTests @Import(ApplicationConfiguration.class) public static class Configuration { + @Bean + public RecordHandler recordHandler(RecordHandler applicationRecordHandler) + { + return new TestRecordHandler(applicationRecordHandler); + } + + @Bean(destroyMethod = "close") + public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) + { + return factory.createConsumer(); + } } }