X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=e16aea7c3c21d26b1ae14c1c8b3ba67a5bcd468c;hb=595eab489c638b07072f6ec7e3c6f52000295931;hp=8124c81aa87d6e4e65e4720a55a375816dc23cd2;hpb=627763878b235ba168c7f55a1ef448851b027bfc;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 8124c81..e16aea7 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -17,6 +17,7 @@ import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; @@ -24,7 +25,6 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; import java.util.*; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -54,13 +54,11 @@ abstract class GenericApplicationTests @Autowired - KafkaConsumer kafkaConsumer; + org.apache.kafka.clients.consumer.Consumer kafkaConsumer; @Autowired Consumer> consumer; @Autowired - ApplicationProperties properties; - @Autowired - ExecutorService executor; + ApplicationProperties applicationProperties; @Autowired MongoClient mongoClient; @Autowired @@ -68,14 +66,13 @@ abstract class GenericApplicationTests @Autowired RebalanceListener rebalanceListener; @Autowired - RecordHandler recordHandler; + TestRecordHandler recordHandler; + @Autowired + EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; - EndlessConsumer endlessConsumer; Map oldOffsets; - Map seenOffsets; - Set> receivedRecords; final RecordGenerator recordGenerator; @@ -99,7 +96,7 @@ abstract class GenericApplicationTests await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); + .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -107,7 +104,7 @@ abstract class GenericApplicationTests .untilAsserted(() -> { checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); }); assertThatExceptionOfType(IllegalStateException.class) @@ -131,7 +128,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -140,8 +137,8 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - assertThat(receivedRecords.size()) + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + assertThat(recordHandler.receivedRecords.size()) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -168,7 +165,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -176,7 +173,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") @@ -229,7 +226,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = seenOffsets.get(tp) + 1; + Long newOffset = recordHandler.seenOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -329,16 +326,16 @@ abstract class GenericApplicationTests { Properties props; props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", applicationProperties.getBootstrapServer()); props.put("linger.ms", 100); props.put("key.serializer", BytesSerializer.class.getName()); props.put("value.serializer", BytesSerializer.class.getName()); testRecordProducer = new KafkaProducer<>(props); props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", applicationProperties.getBootstrapServer()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); + props.put("group.id", applicationProperties.getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); @@ -347,43 +344,30 @@ abstract class GenericApplicationTests seekToEnd(); oldOffsets = new HashMap<>(); - seenOffsets = new HashMap<>(); - receivedRecords = new HashSet<>(); + recordHandler.seenOffsets = new HashMap<>(); + recordHandler.receivedRecords = new HashSet<>(); doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); - seenOffsets.put(tp, offset - 1); + recordHandler.seenOffsets.put(tp, offset - 1); }); - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(recordHandler) - { - @Override - public void onNewRecord(ConsumerRecord record) - { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - } - }; - - endlessConsumer = - new EndlessConsumer<>( - executor, - properties.getClientId(), - properties.getTopic(), - kafkaConsumer, - rebalanceListener, - captureOffsetAndExecuteTestHandler); - endlessConsumer.start(); } @AfterEach public void deinit() { + try + { + endlessConsumer.stop(); + } + catch (Exception e) + { + log.debug("{}", e.toString()); + } + try { testRecordProducer.close(); @@ -400,5 +384,10 @@ abstract class GenericApplicationTests @Import(ApplicationConfiguration.class) public static class Configuration { + @Bean + public RecordHandler recordHandler(RecordHandler applicationRecordHandler) + { + return new TestRecordHandler(applicationRecordHandler); + } } }