X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=93daf6ba490414763fbc76f460130272096d102d;hb=600b0b10f8e98bebef80f75e391a78c459ffb45c;hp=9175e52d21ef7078c24d6c5233e8d9e3893b67fa;hpb=1288af99aeb350661f8b0a60762cba8e1b0f6a24;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 9175e52..93daf6b 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -11,6 +12,9 @@ import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.mongo.MongoProperties; +import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Import; @@ -35,10 +39,13 @@ import static org.awaitility.Awaitility.*; @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) @TestPropertySource( properties = { - "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC, - "consumer.commit-interval=1s" }) + "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", + "sumup.adder.topic=" + TOPIC, + "sumup.adder.commit-interval=500ms", + "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@EnableAutoConfiguration +@AutoConfigureDataMongo @Slf4j abstract class GenericApplicationTests { @@ -54,6 +61,14 @@ abstract class GenericApplicationTests ApplicationProperties properties; @Autowired ExecutorService executor; + @Autowired + MongoClient mongoClient; + @Autowired + MongoProperties mongoProperties; + @Autowired + PollIntervalAwareConsumerRebalanceListener rebalanceListener; + @Autowired + RecordHandler recordHandler; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; @@ -98,6 +113,8 @@ abstract class GenericApplicationTests assertThatExceptionOfType(IllegalStateException.class) .isThrownBy(() -> endlessConsumer.exitStatus()) .describedAs("Consumer should still be running"); + + recordGenerator.assertBusinessLogic(); } @Test @@ -133,6 +150,8 @@ abstract class GenericApplicationTests assertThat(endlessConsumer.exitStatus()) .describedAs("Consumer should have exited abnormally") .containsInstanceOf(RecordDeserializationException.class); + + recordGenerator.assertBusinessLogic(); } @Test @@ -158,9 +177,6 @@ abstract class GenericApplicationTests checkSeenOffsetsForProgress(); compareToCommitedOffsets(oldOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); assertThatNoException() .describedAs("Consumer should not be running") @@ -168,6 +184,8 @@ abstract class GenericApplicationTests assertThat(endlessConsumer.exitStatus()) .describedAs("Consumer should have exited abnormally") .containsInstanceOf(RuntimeException.class); + + recordGenerator.assertBusinessLogic(); } @@ -255,6 +273,11 @@ abstract class GenericApplicationTests { return true; } + + default void assertBusinessLogic() + { + log.debug("No business-logic to assert"); + } } void sendMessage(ProducerRecord record) @@ -313,22 +336,28 @@ abstract class GenericApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = - record -> + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(recordHandler) { - newOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - consumer.accept(record); + @Override + public void onNewRecord(ConsumerRecord record) + { + newOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } }; + mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); + endlessConsumer = new EndlessConsumer<>( executor, properties.getClientId(), properties.getTopic(), kafkaConsumer, + rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();