X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=ee8e8c418d91ee08f92f21973e064a19dfbed75e;hb=4aef11c37556c827397af6c25f5c129cd0147a68;hp=6c25bcd7808409744fc28b7e742c14d37c23efac;hpb=c4fd031abdae00bdbd934216579f0a38ddd46783;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 6c25bcd..ee8e8c4 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -11,10 +11,12 @@ import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Primary; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; @@ -23,7 +25,6 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -36,7 +37,12 @@ import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; -@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) +@SpringJUnitConfig( + initializers = ConfigDataApplicationContextInitializer.class, + classes = { + EndlessConsumer.class, + KafkaAutoConfiguration.class, + ApplicationTests.Configuration.class }) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestPropertySource( properties = { @@ -63,10 +69,10 @@ class ApplicationTests @Autowired ApplicationProperties properties; @Autowired - ExecutorService executor; + EndlessConsumer endlessConsumer; + @Autowired + RecordHandler recordHandler; - Consumer> testHandler; - EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; Set> receivedRecords; @@ -108,17 +114,8 @@ class ApplicationTests await("Consumer failed") .atMost(Duration.ofSeconds(30)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .until(() -> !endlessConsumer.running()); + .untilAsserted(() -> checkSeenOffsetsForProgress()); - checkSeenOffsetsForProgress(); compareToCommitedOffsets(newOffsets); assertThat(receivedRecords.size()) .describedAs("Received not all sent events") @@ -128,8 +125,8 @@ class ApplicationTests .describedAs("Consumer should not be running") .isThrownBy(() -> endlessConsumer.exitStatus()); assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); + .containsInstanceOf(RecordDeserializationException.class) + .describedAs("Consumer should have exited abnormally"); } @@ -240,7 +237,7 @@ class ApplicationTests @BeforeEach public void init() { - testHandler = record -> {} ; + recordHandler.testHandler = (record) -> {}; oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); @@ -252,24 +249,15 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = + recordHandler.captureOffsets = record -> { + receivedRecords.add(record); newOffsets.put( new TopicPartition(record.topic(), record.partition()), record.offset()); - receivedRecords.add(record); - testHandler.accept(record); }; - endlessConsumer = - new EndlessConsumer<>( - executor, - properties.getClientId(), - properties.getTopic(), - kafkaConsumer, - captureOffsetAndExecuteTestHandler); - endlessConsumer.start(); } @@ -286,11 +274,32 @@ class ApplicationTests } } + public static class RecordHandler implements Consumer> + { + Consumer> captureOffsets; + Consumer> testHandler; + + + @Override + public void accept(ConsumerRecord record) + { + captureOffsets + .andThen(testHandler) + .accept(record); + } + } @TestConfiguration @Import(ApplicationConfiguration.class) public static class Configuration { + @Primary + @Bean + public Consumer> testHandler() + { + return new RecordHandler(); + } + @Bean Serializer serializer() {