X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=49ddb47e1f2db54dc9cf3e570c54a14cbfe5591a;hb=0c9a0c1cf9a0065012743efcd940d8721bc33c20;hp=937b40ffafed0d1ab56c99f7ecd22499977cc89a;hpb=2eb3c45c9438a20777b0110defa593dd45c64511;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 937b40f..49ddb47 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -12,7 +12,6 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; @@ -20,6 +19,8 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -37,11 +38,7 @@ import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; -@SpringJUnitConfig( - initializers = ConfigDataApplicationContextInitializer.class, - classes = { - KafkaAutoConfiguration.class, - ApplicationTests.Configuration.class }) +@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", @@ -69,9 +66,11 @@ abstract class GenericApplicationTests @Autowired MongoProperties mongoProperties; @Autowired - TestRecordHandler recordHandler; + KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Autowired - EndlessConsumer endlessConsumer; + TestRecordHandler recordHandler; + @Autowired + EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; @@ -99,7 +98,7 @@ abstract class GenericApplicationTests await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages); + .until(() -> recordHandler.receivedMessages >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -141,7 +140,7 @@ abstract class GenericApplicationTests checkSeenOffsetsForProgress(); assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThat(recordHandler.receivedRecords.size()) + assertThat(recordHandler.receivedMessages) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -157,7 +156,7 @@ abstract class GenericApplicationTests @Test @SkipWhenErrorCannotBeGenerated(logicError = true) - void doesNotCommitOffsetsOnLogicError() + void commitsOffsetsOfUnseenRecordsOnLogicError() { int numberOfGeneratedMessages = recordGenerator.generate(false, true, messageSender); @@ -168,7 +167,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -176,7 +175,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") @@ -348,7 +347,7 @@ abstract class GenericApplicationTests oldOffsets = new HashMap<>(); recordHandler.seenOffsets = new HashMap<>(); - recordHandler.receivedRecords = new HashSet<>(); + recordHandler.receivedMessages = 0; doForCurrentOffsets((tp, offset) -> { @@ -392,5 +391,11 @@ abstract class GenericApplicationTests { return new TestRecordHandler(applicationRecordHandler); } + + @Bean(destroyMethod = "close") + public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) + { + return factory.createConsumer(); + } } }