Tests: Erste Version eines synchronen Integration-Test implementiert
authorKai Moritz <kai@juplo.de>
Sat, 9 Apr 2022 14:14:47 +0000 (16:14 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 11 Apr 2022 13:23:50 +0000 (15:23 +0200)
pom.xml
src/test/java/de/juplo/kafka/ApplicationTests.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 8c5dccc..c01ed2c 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka-test</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
new file mode 100644 (file)
index 0000000..21d1668
--- /dev/null
@@ -0,0 +1,197 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Import;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static de.juplo.kafka.ApplicationTests.PARTITIONS;
+import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
+@TestPropertySource(
+               properties = {
+                               "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "consumer.topic=" + TOPIC })
+@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@Slf4j
+class ApplicationTests
+{
+       public static final String TOPIC = "FOO";
+       public static final int PARTITIONS = 10;
+
+
+       StringSerializer stringSerializer = new StringSerializer();
+       LongSerializer longSerializer = new LongSerializer();
+
+       @Autowired
+       KafkaProducer<String, Bytes> kafkaProducer;
+       @Autowired
+       KafkaConsumer<String, Long> kafkaConsumer;
+       @Autowired
+       ApplicationProperties properties;
+       @Autowired
+       ExecutorService executor;
+
+
+       @Test
+       void commitsCurrentOffsetsOnSuccess()
+       {
+               send100Messages(i ->  new Bytes(longSerializer.serialize(TOPIC, i)));
+
+               Set<ConsumerRecord<String, Long>> received = new HashSet<>();
+               Map<Integer, Long> offsets = runEndlessConsumer(record ->
+               {
+                       received.add(record);
+                       if (received.size() == 100)
+                               throw new WakeupException();
+               });
+
+               check(offsets);
+       }
+
+       @Test
+       void commitsNoOffsetsOnError()
+       {
+               send100Messages(counter ->
+                               counter == 77
+                                               ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
+                                               : new Bytes(longSerializer.serialize(TOPIC, counter)));
+
+               Map<Integer, Long> oldOffsets = new HashMap<>();
+               doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp.partition(), offset -1));
+               Map<Integer, Long> newOffsets = runEndlessConsumer((record) -> {});
+
+               check(oldOffsets);
+       }
+
+
+       void send100Messages(Function<Long, Bytes> messageGenerator)
+       {
+               long i = 0;
+
+               for (int partition = 0; partition < 10; partition++)
+               {
+                       for (int key = 0; key < 10; key++)
+                       {
+                               Bytes value = messageGenerator.apply(++i);
+
+                               ProducerRecord<String, Bytes> record =
+                                               new ProducerRecord<>(
+                                                               TOPIC,
+                                                               partition,
+                                                               Integer.toString(key%2),
+                                                               value);
+
+                               kafkaProducer.send(record, (metadata, e) ->
+                               {
+                                       if (metadata != null)
+                                       {
+                                               log.debug(
+                                                               "{}|{} - {}={}",
+                                                               metadata.partition(),
+                                                               metadata.offset(),
+                                                               record.key(),
+                                                               record.value());
+                                       }
+                                       else
+                                       {
+                                               log.warn(
+                                                               "Exception for {}={}: {}",
+                                                               record.key(),
+                                                               record.value(),
+                                                               e.toString());
+                                       }
+                               });
+                       }
+               }
+       }
+
+       Map<Integer, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
+       {
+               Map<Integer, Long> offsets = new HashMap<>();
+               doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1));
+               Consumer<ConsumerRecord<String, Long>> captureOffset = record -> offsets.put(record.partition(), record.offset());
+               EndlessConsumer<String, Long> endlessConsumer =
+                               new EndlessConsumer<>(
+                                               executor,
+                                               properties.getClientId(),
+                                               properties.getTopic(),
+                                               kafkaConsumer,
+                                               captureOffset.andThen(consumer));
+
+               endlessConsumer.run();
+
+               return offsets;
+       }
+
+       List<TopicPartition> partitions()
+       {
+               return
+                               IntStream
+                                               .range(0, PARTITIONS)
+                                               .mapToObj(partition -> new TopicPartition(TOPIC, partition))
+                                               .collect(Collectors.toList());
+       }
+
+       void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
+       {
+               kafkaConsumer.assign(partitions());
+               partitions().forEach(tp -> consumer.accept(tp, kafkaConsumer.position(tp)));
+               kafkaConsumer.unsubscribe();
+       }
+
+       void check(Map<Integer, Long> offsets)
+       {
+               doForCurrentOffsets((tp, offset) ->
+               {
+                       Long expected = offsets.get(tp.partition()) + 1;
+                       log.debug("Checking, if the offset for {} is {}", tp, expected);
+                       assertThat(offset).isEqualTo(expected);
+               });
+       }
+
+
+       @TestConfiguration
+       @Import(ApplicationConfiguration.class)
+       public static class Configuration
+       {
+               @Bean
+               KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
+               {
+                       Properties props = new Properties();
+                       props.put("bootstrap.servers", properties.getBootstrapServer());
+                       props.put("linger.ms", 100);
+                       props.put("key.serializer", StringSerializer.class.getName());
+                       props.put("value.serializer", BytesSerializer.class.getName());
+
+                       return new KafkaProducer<>(props);
+               }
+       }
+}