X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=35d2b2e03d5510c74b784bfc334f66fff8daa1c8;hb=dac4556048bca9bfc505a0c348ec9a46313c8cf1;hp=bf1cdb8515a30053b64bfec64662809c8fa69d43;hpb=f6212abfac1d872979d2a27f5a6bf4708b643db6;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index bf1cdb8..35d2b2e 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -6,26 +6,26 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.serialization.BytesSerializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; -import org.junit.jupiter.api.MethodOrderer; -import org.junit.jupiter.api.Order; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Primary; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import java.time.Duration; import java.util.*; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ExecutionException; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -35,9 +35,15 @@ import java.util.stream.IntStream; import static de.juplo.kafka.ApplicationTests.PARTITIONS; import static de.juplo.kafka.ApplicationTests.TOPIC; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.*; -@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) +@SpringJUnitConfig( + initializers = ConfigDataApplicationContextInitializer.class, + classes = { + EndlessConsumer.class, + KafkaAutoConfiguration.class, + ApplicationTests.Configuration.class }) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestPropertySource( properties = { @@ -57,58 +63,74 @@ class ApplicationTests @Autowired KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; + KafkaConsumer offsetConsumer; @Autowired ApplicationProperties properties; @Autowired - ExecutorService executor; + EndlessConsumer endlessConsumer; + @Autowired + RecordHandler recordHandler; + + Map oldOffsets; + Map newOffsets; + + /** Tests methods */ @Test @Order(1) // << The poistion pill is not skipped. Hence, this test must run first - void commitsCurrentOffsetsOnSuccess() + void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i))); - Map oldOffsets = new HashMap<>(); - doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1)); Set> received = new HashSet<>(); - Map newOffsets = runEndlessConsumer(record -> - { - received.add(record); - if (received.size() == 100) - throw new WakeupException(); - }); + recordHandler.testHandler = record -> received.add(record); - Set withProgress = new HashSet<>(); - partitions().forEach(tp -> - { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); - if (!oldOffset.equals(newOffset)) - { - log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); - withProgress.add(tp); - } - }); - assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress"); + await("100 records received") + .atMost(Duration.ofSeconds(30)) + .until(() -> received.size() >= 100); - check(newOffsets); + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + }); } @Test @Order(2) - void commitsNoOffsetsOnError() + void commitsOffsetOfErrorForReprocessingOnError() { send100Messages(counter -> counter == 77 ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) : new Bytes(longSerializer.serialize(TOPIC, counter))); - Map oldOffsets = new HashMap<>(); - doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1)); - Map newOffsets = runEndlessConsumer((record) -> {}); + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> checkSeenOffsetsForProgress()); + + compareToCommitedOffsets(newOffsets); + } + + /** Helper methods for the verification of expectations */ + + void compareToCommitedOffsets(Map offsetsToCheck) + { + doForCurrentOffsets((tp, offset) -> + { + Long expected = offsetsToCheck.get(tp) + 1; + log.debug("Checking, if the offset for {} is {}", tp, expected); + assertThat(offset).isEqualTo(expected); + }); + } + + void checkSeenOffsetsForProgress() + { + // Be sure, that some messages were consumed...! Set withProgress = new HashSet<>(); partitions().forEach(tp -> { @@ -121,8 +143,25 @@ class ApplicationTests } }); assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress"); + } - check(oldOffsets); + + /** Helper methods for setting up and running the tests */ + + void doForCurrentOffsets(BiConsumer consumer) + { + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); + offsetConsumer.unsubscribe(); + } + + List partitions() + { + return + IntStream + .range(0, PARTITIONS) + .mapToObj(partition -> new TopicPartition(TOPIC, partition)) + .collect(Collectors.toList()); } @@ -167,59 +206,69 @@ class ApplicationTests } } - Map runEndlessConsumer(Consumer> consumer) + + @BeforeEach + public void init() { - Map offsets = new HashMap<>(); - doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1)); - Consumer> captureOffset = + recordHandler.testHandler = (record) -> {}; + + oldOffsets = new HashMap<>(); + newOffsets = new HashMap<>(); + + doForCurrentOffsets((tp, offset) -> + { + oldOffsets.put(tp, offset - 1); + newOffsets.put(tp, offset - 1); + }); + + recordHandler.captureOffsets = record -> - offsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - EndlessConsumer endlessConsumer = - new EndlessConsumer<>( - executor, - properties.getClientId(), - properties.getTopic(), - kafkaConsumer, - captureOffset.andThen(consumer)); - - endlessConsumer.run(); - - return offsets; - } + newOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); - List partitions() - { - return - IntStream - .range(0, PARTITIONS) - .mapToObj(partition -> new TopicPartition(TOPIC, partition)) - .collect(Collectors.toList()); + endlessConsumer.start(); } - void doForCurrentOffsets(BiConsumer consumer) + @AfterEach + public void deinit() { - kafkaConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, kafkaConsumer.position(tp))); - kafkaConsumer.unsubscribe(); + try + { + endlessConsumer.stop(); + } + catch (Exception e) + { + log.info("Exception while stopping the consumer: {}", e.toString()); + } } - void check(Map offsets) + public static class RecordHandler implements Consumer> { - doForCurrentOffsets((tp, offset) -> + Consumer> captureOffsets; + Consumer> testHandler; + + + @Override + public void accept(ConsumerRecord record) { - Long expected = offsets.get(tp) + 1; - log.debug("Checking, if the offset for {} is {}", tp, expected); - assertThat(offset).isEqualTo(expected); - }); + captureOffsets + .andThen(testHandler) + .accept(record); + } } - @TestConfiguration @Import(ApplicationConfiguration.class) public static class Configuration { + @Primary + @Bean + public Consumer> testHandler() + { + return new RecordHandler(); + } + @Bean KafkaProducer kafkaProducer(ApplicationProperties properties) { @@ -231,5 +280,18 @@ class ApplicationTests return new KafkaProducer<>(props); } + + @Bean + KafkaConsumer offsetConsumer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", "OFFSET-CONSUMER"); + props.put("group.id", properties.getGroupId()); + props.put("key.deserializer", BytesDeserializer.class.getName()); + props.put("value.deserializer", BytesDeserializer.class.getName()); + + return new KafkaConsumer<>(props); + } } }