From bcfa96532b0a33d4fb97392aefe0d1cf7d886896 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 21 Aug 2022 18:37:13 +0200 Subject: [PATCH] =?utf8?q?Tests=20f=C3=BCr=20EndlessConsumer=20in=20Vorlag?= =?utf8?q?e=20f=C3=BCr=20Summenformel-=C3=9Cbung=20entfernt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ApplicationTests.java | 154 ------- .../ErrorCannotBeGeneratedCondition.java | 60 --- .../juplo/kafka/GenericApplicationTests.java | 402 ------------------ .../kafka/SkipWhenErrorCannotBeGenerated.java | 15 - .../de/juplo/kafka/TestRecordHandler.java | 22 - 5 files changed, 653 deletions(-) delete mode 100644 src/test/java/de/juplo/kafka/ApplicationTests.java delete mode 100644 src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java delete mode 100644 src/test/java/de/juplo/kafka/GenericApplicationTests.java delete mode 100644 src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java delete mode 100644 src/test/java/de/juplo/kafka/TestRecordHandler.java diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java deleted file mode 100644 index 6a037eb..0000000 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ /dev/null @@ -1,154 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Bytes; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.*; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.assertj.core.api.Assertions.assertThat; - - -@Slf4j -public class ApplicationTests extends GenericApplicationTests -{ - @Autowired - StateRepository stateRepository; - - - public ApplicationTests() - { - super(new ApplicationTestRecrodGenerator()); - ((ApplicationTestRecrodGenerator)recordGenerator).tests = this; - } - - - static class ApplicationTestRecrodGenerator implements RecordGenerator - { - ApplicationTests tests; - - final int[] numbers = {1, 77, 33, 2, 66, 666, 11}; - final String[] dieWilden13 = - IntStream - .range(1, 14) - .mapToObj(i -> "seeräuber-" + i) - .toArray(i -> new String[i]); - final StringSerializer stringSerializer = new StringSerializer(); - final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE")); - - int counter = 0; - - Map> state; - - @Override - public int generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender) - { - counter = 0; - state = - Arrays - .stream(dieWilden13) - .collect(Collectors.toMap( - seeräuber -> seeräuber, - seeräuber -> new LinkedList())); - - int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; - int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; - int next = 0; - - for (int pass = 0; pass < 333; pass++) - { - for (int i = 0; i<13; i++) - { - String seeräuber = dieWilden13[i]; - Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber)); - - if (message[i] > number[i]) - { - send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender); - state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2)); - // Pick next number to calculate - number[i] = numbers[next++%numbers.length]; - message[i] = 1; - log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]); - } - - Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++))); - send(key, value, fail(logicErrors, pass, counter), messageSender); - } - } - - return counter; - } - - boolean fail (boolean logicErrors, int pass, int counter) - { - return logicErrors && pass > 300 && counter%77 == 0; - } - - void send( - Bytes key, - Bytes value, - boolean fail, - Consumer> messageSender) - { - counter++; - - if (fail) - { - value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1))); - } - - messageSender.accept(new ProducerRecord<>(TOPIC, key, value)); - } - - @Override - public boolean canGeneratePoisonPill() - { - return false; - } - - @Override - public void assertBusinessLogic() - { - for (int i=0; i - { - String user = entry.getKey(); - List resultsForUser = entry.getValue(); - - for (int j=0; j < resultsForUser.size(); j++) - { - if (!(j < state.get(user).size())) - { - break; - } - - assertThat(resultsForUser.get(j)) - .as("Unexpected results calculation %d of user %s", j, user) - .isEqualTo(state.get(user).get(j)); - } - - assertThat(state.get(user)) - .as("More results calculated for user %s as expected", user) - .containsAll(resultsForUser); - }); - } - } - } -} diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java deleted file mode 100644 index 606218f..0000000 --- a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java +++ /dev/null @@ -1,60 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.extension.ConditionEvaluationResult; -import org.junit.jupiter.api.extension.ExecutionCondition; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.platform.commons.util.AnnotationUtils; - -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - - -public class ErrorCannotBeGeneratedCondition implements ExecutionCondition -{ - @Override - public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) - { - final Optional optional = - AnnotationUtils.findAnnotation( - context.getElement(), - SkipWhenErrorCannotBeGenerated.class); - - if (context.getTestInstance().isEmpty()) - return ConditionEvaluationResult.enabled("Test-instance ist not available"); - - if (optional.isPresent()) - { - SkipWhenErrorCannotBeGenerated skipWhenErrorCannotBeGenerated = optional.get(); - GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get(); - List missingRequiredErrors = new LinkedList<>(); - - if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisonPill()) - missingRequiredErrors.add("Poison-Pill"); - - if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError()) - missingRequiredErrors.add("Logic-Error"); - - StringBuilder builder = new StringBuilder(); - builder.append(context.getTestClass().get().getSimpleName()); - - if (missingRequiredErrors.isEmpty()) - { - builder.append(" can generate all required types of errors"); - return ConditionEvaluationResult.enabled(builder.toString()); - } - - builder.append(" cannot generate the required error(s): "); - builder.append( - missingRequiredErrors - .stream() - .collect(Collectors.joining(", "))); - - return ConditionEvaluationResult.disabled(builder.toString()); - } - - return ConditionEvaluationResult.enabled( - "Not annotated with " + SkipWhenErrorCannotBeGenerated.class.getSimpleName()); - } -} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java deleted file mode 100644 index e63c5ce..0000000 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ /dev/null @@ -1,402 +0,0 @@ -package de.juplo.kafka; - -import com.mongodb.client.MongoClient; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.serialization.*; -import org.apache.kafka.common.utils.Bytes; -import org.junit.jupiter.api.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.mongo.MongoProperties; -import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; -import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Import; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; - -import java.time.Duration; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static de.juplo.kafka.GenericApplicationTests.PARTITIONS; -import static de.juplo.kafka.GenericApplicationTests.TOPIC; -import static org.assertj.core.api.Assertions.*; -import static org.awaitility.Awaitility.*; - - -@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) -@TestPropertySource( - properties = { - "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", - "sumup.adder.topic=" + TOPIC, - "sumup.adder.commit-interval=500ms", - "spring.mongodb.embedded.version=4.4.13" }) -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -@EnableAutoConfiguration -@AutoConfigureDataMongo -@Slf4j -abstract class GenericApplicationTests -{ - public static final String TOPIC = "FOO"; - public static final int PARTITIONS = 10; - - - @Autowired - KafkaConsumer kafkaConsumer; - @Autowired - Consumer> consumer; - @Autowired - ApplicationProperties properties; - @Autowired - ExecutorService executor; - @Autowired - MongoClient mongoClient; - @Autowired - MongoProperties mongoProperties; - @Autowired - RecordHandler recordHandler; - - KafkaProducer testRecordProducer; - KafkaConsumer offsetConsumer; - EndlessConsumer endlessConsumer; - Map oldOffsets; - Map seenOffsets; - Set> receivedRecords; - - - final RecordGenerator recordGenerator; - final Consumer> messageSender; - - public GenericApplicationTests(RecordGenerator recordGenerator) - { - this.recordGenerator = recordGenerator; - this.messageSender = (record) -> sendMessage(record); - } - - - /** Tests methods */ - - @Test - void commitsCurrentOffsetsOnSuccess() throws Exception - { - int numberOfGeneratedMessages = - recordGenerator.generate(false, false, messageSender); - - await(numberOfGeneratedMessages + " records received") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); - - await("Offsets committed") - .atMost(Duration.ofSeconds(10)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(() -> - { - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - }); - - assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> endlessConsumer.exitStatus()) - .describedAs("Consumer should still be running"); - - endlessConsumer.stop(); - recordGenerator.assertBusinessLogic(); - } - - @Test - @SkipWhenErrorCannotBeGenerated(poisonPill = true) - void commitsOffsetOfErrorForReprocessingOnDeserializationError() - { - int numberOfGeneratedMessages = - recordGenerator.generate(true, false, messageSender); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); - - recordGenerator.assertBusinessLogic(); - } - - @Test - @SkipWhenErrorCannotBeGenerated(logicError = true) - void doesNotCommitOffsetsOnLogicError() - { - int numberOfGeneratedMessages = - recordGenerator.generate(false, true, messageSender); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RuntimeException.class); - - recordGenerator.assertBusinessLogic(); - } - - - /** Helper methods for the verification of expectations */ - - void assertSeenOffsetsEqualCommittedOffsets(Map offsetsToCheck) - { - doForCurrentOffsets((tp, offset) -> - { - Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected); - assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") - .isEqualTo(expected); - }); - } - - void assertSeenOffsetsAreBehindCommittedOffsets(Map offsetsToCheck) - { - List isOffsetBehindSeen = new LinkedList<>(); - - doForCurrentOffsets((tp, offset) -> - { - Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); - assertThat(offset) - .describedAs("Committed offset must be at most equal to the offset of the consumer") - .isLessThanOrEqualTo(expected); - isOffsetBehindSeen.add(offset < expected); - }); - - assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next)) - .describedAs("Committed offsets are behind seen offsets") - .isTrue(); - } - - void checkSeenOffsetsForProgress() - { - // Be sure, that some messages were consumed...! - Set withProgress = new HashSet<>(); - partitions().forEach(tp -> - { - Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = seenOffsets.get(tp) + 1; - if (!oldOffset.equals(newOffset)) - { - log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); - withProgress.add(tp); - } - }); - assertThat(withProgress) - .describedAs("Some offsets must have changed, compared to the old offset-positions") - .isNotEmpty(); - } - - - /** Helper methods for setting up and running the tests */ - - void seekToEnd() - { - offsetConsumer.assign(partitions()); - offsetConsumer.seekToEnd(partitions()); - partitions().forEach(tp -> - { - // seekToEnd() works lazily: it only takes effect on poll()/position() - Long offset = offsetConsumer.position(tp); - log.info("New position for {}: {}", tp, offset); - }); - // The new positions must be commited! - offsetConsumer.commitSync(); - offsetConsumer.unsubscribe(); - } - - void doForCurrentOffsets(BiConsumer consumer) - { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); - } - - List partitions() - { - return - IntStream - .range(0, PARTITIONS) - .mapToObj(partition -> new TopicPartition(TOPIC, partition)) - .collect(Collectors.toList()); - } - - - public interface RecordGenerator - { - int generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender); - - default boolean canGeneratePoisonPill() - { - return true; - } - - default boolean canGenerateLogicError() - { - return true; - } - - default void assertBusinessLogic() - { - log.debug("No business-logic to assert"); - } - } - - void sendMessage(ProducerRecord record) - { - testRecordProducer.send(record, (metadata, e) -> - { - if (metadata != null) - { - log.debug( - "{}|{} - {}={}", - metadata.partition(), - metadata.offset(), - record.key(), - record.value()); - } - else - { - log.warn( - "Exception for {}={}: {}", - record.key(), - record.value(), - e.toString()); - } - }); - } - - - @BeforeEach - public void init() - { - Properties props; - props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("linger.ms", 100); - props.put("key.serializer", BytesSerializer.class.getName()); - props.put("value.serializer", BytesSerializer.class.getName()); - testRecordProducer = new KafkaProducer<>(props); - - props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); - props.put("key.deserializer", BytesDeserializer.class.getName()); - props.put("value.deserializer", BytesDeserializer.class.getName()); - offsetConsumer = new KafkaConsumer<>(props); - - mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); - seekToEnd(); - - oldOffsets = new HashMap<>(); - seenOffsets = new HashMap<>(); - receivedRecords = new HashSet<>(); - - doForCurrentOffsets((tp, offset) -> - { - oldOffsets.put(tp, offset - 1); - seenOffsets.put(tp, offset - 1); - }); - - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(recordHandler) - { - @Override - public void onNewRecord(ConsumerRecord record) - { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - } - }; - - endlessConsumer = - new EndlessConsumer<>( - executor, - properties.getClientId(), - properties.getTopic(), - kafkaConsumer, - captureOffsetAndExecuteTestHandler); - - endlessConsumer.start(); - } - - @AfterEach - public void deinit() - { - try - { - testRecordProducer.close(); - offsetConsumer.close(); - } - catch (Exception e) - { - log.info("Exception while stopping the consumer: {}", e.toString()); - } - } - - - @TestConfiguration - @Import(ApplicationConfiguration.class) - public static class Configuration - { - } -} diff --git a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java b/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java deleted file mode 100644 index 6d15e9e..0000000 --- a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.extension.ExtendWith; - -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; - - -@Retention(RetentionPolicy.RUNTIME) -@ExtendWith(ErrorCannotBeGeneratedCondition.class) -public @interface SkipWhenErrorCannotBeGenerated -{ - boolean poisonPill() default false; - boolean logicError() default false; -} diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java deleted file mode 100644 index b4efdd6..0000000 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.consumer.ConsumerRecord; - - -@RequiredArgsConstructor -public abstract class TestRecordHandler implements RecordHandler -{ - private final RecordHandler handler; - - - public abstract void onNewRecord(ConsumerRecord record); - - - @Override - public void accept(ConsumerRecord record) - { - this.onNewRecord(record); - handler.accept(record); - } -} -- 2.20.1