From 2eb3c45c9438a20777b0110defa593dd45c64511 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 9 Sep 2022 12:44:54 +0200 Subject: [PATCH] Der Test verwendet die `@Bean` von `EndlessConsumer` MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Per `git cherry-pick` vom Branch `deserialization` gepflückt. * Conflicts: ** src/main/java/de/juplo/kafka/ApplicationConfiguration.java ** src/test/java/de/juplo/kafka/ApplicationTests.java ** src/test/java/de/juplo/kafka/GenericApplicationTests.java --- .../juplo/kafka/ApplicationConfiguration.java | 7 +- .../juplo/kafka/GenericApplicationTests.java | 75 +++++++------------ .../de/juplo/kafka/TestRecordHandler.java | 16 +++- 3 files changed, 46 insertions(+), 52 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index bae5d51..08c827c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -18,7 +18,7 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public ApplicationRecordHandler recordHandler( + public ApplicationRecordHandler applicationRecordHandler( AdderResults adderResults, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) @@ -40,8 +40,7 @@ public class ApplicationConfiguration ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) + KafkaProperties kafkaProperties) { return new ApplicationRebalanceListener( recordHandler, @@ -55,7 +54,7 @@ public class ApplicationConfiguration Consumer kafkaConsumer, ExecutorService executor, ApplicationRebalanceListener rebalanceListener, - ApplicationRecordHandler recordHandler, + RecordHandler recordHandler, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 21c3f7f..937b40f 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,8 +2,6 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -20,6 +18,7 @@ import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; @@ -27,7 +26,6 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; import java.util.*; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -63,28 +61,21 @@ abstract class GenericApplicationTests @Autowired org.apache.kafka.clients.consumer.Consumer kafkaConsumer; @Autowired - Consumer> consumer; - @Autowired - ApplicationProperties applicationProperties; - @Autowired KafkaProperties kafkaProperties; @Autowired - ExecutorService executor; + ApplicationProperties applicationProperties; @Autowired MongoClient mongoClient; @Autowired MongoProperties mongoProperties; @Autowired - ConsumerRebalanceListener rebalanceListener; + TestRecordHandler recordHandler; @Autowired - RecordHandler recordHandler; + EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; - EndlessConsumer endlessConsumer; Map oldOffsets; - Map seenOffsets; - Set> receivedRecords; final RecordGenerator recordGenerator; @@ -108,7 +99,7 @@ abstract class GenericApplicationTests await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); + .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -116,7 +107,7 @@ abstract class GenericApplicationTests .untilAsserted(() -> { checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); }); assertThatExceptionOfType(IllegalStateException.class) @@ -140,7 +131,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -149,8 +140,8 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - assertThat(receivedRecords.size()) + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + assertThat(recordHandler.receivedRecords.size()) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -177,7 +168,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -185,7 +176,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") @@ -238,7 +229,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = seenOffsets.get(tp) + 1; + Long newOffset = recordHandler.seenOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -356,43 +347,30 @@ abstract class GenericApplicationTests seekToEnd(); oldOffsets = new HashMap<>(); - seenOffsets = new HashMap<>(); - receivedRecords = new HashSet<>(); + recordHandler.seenOffsets = new HashMap<>(); + recordHandler.receivedRecords = new HashSet<>(); doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); - seenOffsets.put(tp, offset - 1); + recordHandler.seenOffsets.put(tp, offset - 1); }); - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(recordHandler) - { - @Override - public void onNewRecord(ConsumerRecord record) - { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - } - }; - - endlessConsumer = - new EndlessConsumer<>( - executor, - kafkaProperties.getClientId(), - applicationProperties.getTopic(), - kafkaConsumer, - rebalanceListener, - captureOffsetAndExecuteTestHandler); - endlessConsumer.start(); } @AfterEach public void deinit() { + try + { + endlessConsumer.stop(); + } + catch (Exception e) + { + log.debug("{}", e.toString()); + } + try { testRecordProducer.close(); @@ -409,5 +387,10 @@ abstract class GenericApplicationTests @Import(ApplicationConfiguration.class) public static class Configuration { + @Bean + public RecordHandler recordHandler(RecordHandler applicationRecordHandler) + { + return new TestRecordHandler(applicationRecordHandler); + } } } diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java index b4efdd6..37d3f65 100644 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -2,16 +2,28 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; +import java.util.Set; @RequiredArgsConstructor -public abstract class TestRecordHandler implements RecordHandler +public class TestRecordHandler implements RecordHandler { private final RecordHandler handler; + Map seenOffsets; + Set> receivedRecords; - public abstract void onNewRecord(ConsumerRecord record); + public void onNewRecord(ConsumerRecord record) + { + seenOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } @Override public void accept(ConsumerRecord record) -- 2.20.1