X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=21c3f7f01c524eb64ad88d204b9b7a82e4a65e22;hb=7adff476ad862d30d668d75212d1ca1c7cf16b03;hp=9175e52d21ef7078c24d6c5233e8d9e3893b67fa;hpb=1288af99aeb350661f8b0a60762cba8e1b0f6a24;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 9175e52..21c3f7f 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -1,6 +1,8 @@ package de.juplo.kafka; +import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -11,6 +13,11 @@ import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.autoconfigure.mongo.MongoProperties; +import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Import; @@ -32,13 +39,20 @@ import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; -@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) +@SpringJUnitConfig( + initializers = ConfigDataApplicationContextInitializer.class, + classes = { + KafkaAutoConfiguration.class, + ApplicationTests.Configuration.class }) @TestPropertySource( properties = { - "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC, - "consumer.commit-interval=1s" }) + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "sumup.adder.topic=" + TOPIC, + "spring.kafka.consumer.auto-commit-interval=500ms", + "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@EnableAutoConfiguration +@AutoConfigureDataMongo @Slf4j abstract class GenericApplicationTests { @@ -47,19 +61,29 @@ abstract class GenericApplicationTests @Autowired - KafkaConsumer kafkaConsumer; + org.apache.kafka.clients.consumer.Consumer kafkaConsumer; @Autowired Consumer> consumer; @Autowired - ApplicationProperties properties; + ApplicationProperties applicationProperties; + @Autowired + KafkaProperties kafkaProperties; @Autowired ExecutorService executor; + @Autowired + MongoClient mongoClient; + @Autowired + MongoProperties mongoProperties; + @Autowired + ConsumerRebalanceListener rebalanceListener; + @Autowired + RecordHandler recordHandler; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; EndlessConsumer endlessConsumer; Map oldOffsets; - Map newOffsets; + Map seenOffsets; Set> receivedRecords; @@ -76,7 +100,7 @@ abstract class GenericApplicationTests /** Tests methods */ @Test - void commitsCurrentOffsetsOnSuccess() + void commitsCurrentOffsetsOnSuccess() throws Exception { int numberOfGeneratedMessages = recordGenerator.generate(false, false, messageSender); @@ -92,12 +116,15 @@ abstract class GenericApplicationTests .untilAsserted(() -> { checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); }); assertThatExceptionOfType(IllegalStateException.class) .isThrownBy(() -> endlessConsumer.exitStatus()) .describedAs("Consumer should still be running"); + + endlessConsumer.stop(); + recordGenerator.assertBusinessLogic(); } @Test @@ -113,7 +140,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -122,7 +149,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); assertThat(receivedRecords.size()) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -133,6 +160,8 @@ abstract class GenericApplicationTests assertThat(endlessConsumer.exitStatus()) .describedAs("Consumer should have exited abnormally") .containsInstanceOf(RecordDeserializationException.class); + + recordGenerator.assertBusinessLogic(); } @Test @@ -148,7 +177,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -156,11 +185,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); + assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") @@ -168,23 +193,44 @@ abstract class GenericApplicationTests assertThat(endlessConsumer.exitStatus()) .describedAs("Consumer should have exited abnormally") .containsInstanceOf(RuntimeException.class); + + recordGenerator.assertBusinessLogic(); } /** Helper methods for the verification of expectations */ - void compareToCommitedOffsets(Map offsetsToCheck) + void assertSeenOffsetsEqualCommittedOffsets(Map offsetsToCheck) { doForCurrentOffsets((tp, offset) -> { Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset for {} is {}", tp, expected); + log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected); assertThat(offset) .describedAs("Committed offset corresponds to the offset of the consumer") .isEqualTo(expected); }); } + void assertSeenOffsetsAreBehindCommittedOffsets(Map offsetsToCheck) + { + List isOffsetBehindSeen = new LinkedList<>(); + + doForCurrentOffsets((tp, offset) -> + { + Long expected = offsetsToCheck.get(tp) + 1; + log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); + assertThat(offset) + .describedAs("Committed offset must be at most equal to the offset of the consumer") + .isLessThanOrEqualTo(expected); + isOffsetBehindSeen.add(offset < expected); + }); + + assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next)) + .describedAs("Committed offsets are behind seen offsets") + .isTrue(); + } + void checkSeenOffsetsForProgress() { // Be sure, that some messages were consumed...! @@ -192,7 +238,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = newOffsets.get(tp) + 1; + Long newOffset = seenOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -255,6 +301,11 @@ abstract class GenericApplicationTests { return true; } + + default void assertBusinessLogic() + { + log.debug("No business-logic to assert"); + } } void sendMessage(ProducerRecord record) @@ -287,48 +338,53 @@ abstract class GenericApplicationTests { Properties props; props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); props.put("linger.ms", 100); props.put("key.serializer", BytesSerializer.class.getName()); props.put("value.serializer", BytesSerializer.class.getName()); testRecordProducer = new KafkaProducer<>(props); props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); + props.put("group.id", kafkaProperties.getConsumer().getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); + mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); seekToEnd(); oldOffsets = new HashMap<>(); - newOffsets = new HashMap<>(); + seenOffsets = new HashMap<>(); receivedRecords = new HashSet<>(); doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); - newOffsets.put(tp, offset - 1); + seenOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = - record -> + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(recordHandler) { - newOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - consumer.accept(record); + @Override + public void onNewRecord(ConsumerRecord record) + { + seenOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } }; endlessConsumer = new EndlessConsumer<>( executor, - properties.getClientId(), - properties.getTopic(), + kafkaProperties.getClientId(), + applicationProperties.getTopic(), kafkaConsumer, + rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start(); @@ -339,7 +395,6 @@ abstract class GenericApplicationTests { try { - endlessConsumer.stop(); testRecordProducer.close(); offsetConsumer.close(); }