From fe867d6d14fd90aab95bdd7ba9374a585c268d3f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 9 Apr 2022 16:14:47 +0200 Subject: [PATCH] Tests: Erste Version eines synchronen Integration-Test implementiert --- pom.xml | 5 + .../java/de/juplo/kafka/ApplicationTests.java | 197 ++++++++++++++++++ 2 files changed, 202 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/ApplicationTests.java diff --git a/pom.xml b/pom.xml index 8c5dccc..c01ed2c 100644 --- a/pom.xml +++ b/pom.xml @@ -47,6 +47,11 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java new file mode 100644 index 0000000..21d1668 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -0,0 +1,197 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static de.juplo.kafka.ApplicationTests.PARTITIONS; +import static de.juplo.kafka.ApplicationTests.TOPIC; +import static org.assertj.core.api.Assertions.assertThat; + + +@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) +@TestPropertySource( + properties = { + "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", + "consumer.topic=" + TOPIC }) +@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@Slf4j +class ApplicationTests +{ + public static final String TOPIC = "FOO"; + public static final int PARTITIONS = 10; + + + StringSerializer stringSerializer = new StringSerializer(); + LongSerializer longSerializer = new LongSerializer(); + + @Autowired + KafkaProducer kafkaProducer; + @Autowired + KafkaConsumer kafkaConsumer; + @Autowired + ApplicationProperties properties; + @Autowired + ExecutorService executor; + + + @Test + void commitsCurrentOffsetsOnSuccess() + { + send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i))); + + Set> received = new HashSet<>(); + Map offsets = runEndlessConsumer(record -> + { + received.add(record); + if (received.size() == 100) + throw new WakeupException(); + }); + + check(offsets); + } + + @Test + void commitsNoOffsetsOnError() + { + send100Messages(counter -> + counter == 77 + ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) + : new Bytes(longSerializer.serialize(TOPIC, counter))); + + Map oldOffsets = new HashMap<>(); + doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp.partition(), offset -1)); + Map newOffsets = runEndlessConsumer((record) -> {}); + + check(oldOffsets); + } + + + void send100Messages(Function messageGenerator) + { + long i = 0; + + for (int partition = 0; partition < 10; partition++) + { + for (int key = 0; key < 10; key++) + { + Bytes value = messageGenerator.apply(++i); + + ProducerRecord record = + new ProducerRecord<>( + TOPIC, + partition, + Integer.toString(key%2), + value); + + kafkaProducer.send(record, (metadata, e) -> + { + if (metadata != null) + { + log.debug( + "{}|{} - {}={}", + metadata.partition(), + metadata.offset(), + record.key(), + record.value()); + } + else + { + log.warn( + "Exception for {}={}: {}", + record.key(), + record.value(), + e.toString()); + } + }); + } + } + } + + Map runEndlessConsumer(Consumer> consumer) + { + Map offsets = new HashMap<>(); + doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1)); + Consumer> captureOffset = record -> offsets.put(record.partition(), record.offset()); + EndlessConsumer endlessConsumer = + new EndlessConsumer<>( + executor, + properties.getClientId(), + properties.getTopic(), + kafkaConsumer, + captureOffset.andThen(consumer)); + + endlessConsumer.run(); + + return offsets; + } + + List partitions() + { + return + IntStream + .range(0, PARTITIONS) + .mapToObj(partition -> new TopicPartition(TOPIC, partition)) + .collect(Collectors.toList()); + } + + void doForCurrentOffsets(BiConsumer consumer) + { + kafkaConsumer.assign(partitions()); + partitions().forEach(tp -> consumer.accept(tp, kafkaConsumer.position(tp))); + kafkaConsumer.unsubscribe(); + } + + void check(Map offsets) + { + doForCurrentOffsets((tp, offset) -> + { + Long expected = offsets.get(tp.partition()) + 1; + log.debug("Checking, if the offset for {} is {}", tp, expected); + assertThat(offset).isEqualTo(expected); + }); + } + + + @TestConfiguration + @Import(ApplicationConfiguration.class) + public static class Configuration + { + @Bean + KafkaProducer kafkaProducer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("linger.ms", 100); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", BytesSerializer.class.getName()); + + return new KafkaProducer<>(props); + } + } +} -- 2.20.1