Tests aus gemerged springified-consumer--serialization -> deserialization
authorKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 10:06:01 +0000 (12:06 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 10:55:12 +0000 (12:55 +0200)
* Es wurde nur der hinzugefügte Test übernommen.
* Der hinzugefügte Test wurde an das von Spring-Kafka abweichende
  Verhalten bei einem Logik-Fehler angepasst: Kafka führt nicht automatisch
  Seeks oder einene Commit durch. Da `EndlessConsumer` bei einem
  Logik-Fehler explizit ein `unsubscribe()` durchführt, wird kein
  Offset-Commit durchgefürt, so dass die alten Offset-Positionen gültig
  bleiben.
* Der Test wurde entsprechend umbenannt.
* `RecordGenerator` wurde um einen weiteren Integer-Set erweitert, über
  den die Indizes der zu erzeugenden Logik-Fehler gesetzt werden können.
* Der hinzugefügte Test wurde auf die überarbeitete Methode zur Erzeugung
  der Test-Nachrichten umgestellt.
* `ApplicationTest` wurde so ergänzt, dass der für den hinzugefügten Test
  benötigte Logik-Fehler erzeugt wird.

1  2 
src/test/java/de/juplo/kafka/ApplicationTest.java
src/test/java/de/juplo/kafka/GenericApplicationTest.java

index 81165ab,0000000..ed93a21
mode 100644,000000..100644
--- /dev/null
@@@ -1,56 -1,0 +1,84 @@@
-                 Bytes value =
-                     poisonPills.contains(i)
-                         ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
-                         : new Bytes(longSerializer.serialize(TOPIC, (long)i));
 +package de.juplo.kafka;
 +
++import org.apache.kafka.clients.consumer.ConsumerRecord;
 +import org.apache.kafka.clients.producer.ProducerRecord;
 +import org.apache.kafka.common.serialization.LongSerializer;
 +import org.apache.kafka.common.serialization.StringSerializer;
 +import org.apache.kafka.common.utils.Bytes;
++import org.springframework.boot.test.context.TestConfiguration;
++import org.springframework.context.annotation.Bean;
++import org.springframework.context.annotation.Primary;
++import org.springframework.test.context.ContextConfiguration;
 +
 +import java.util.Set;
 +import java.util.function.Consumer;
 +
 +
++@ContextConfiguration(classes = ApplicationTest.Configuration.class)
 +public class ApplicationTest extends GenericApplicationTest<String, Long>
 +{
 +  public ApplicationTest()
 +  {
 +    super(
 +        new RecordGenerator()
 +        {
 +          final StringSerializer stringSerializer = new StringSerializer();
 +          final LongSerializer longSerializer = new LongSerializer();
 +
 +
 +          @Override
 +          public void generate(
 +              int numberOfMessagesToGenerate,
 +              Set<Integer> poisonPills,
++              Set<Integer> logicErrors,
 +              Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
 +          {
 +            int i = 0;
 +
 +            for (int partition = 0; partition < 10; partition++)
 +            {
 +              for (int key = 0; key < 10; key++)
 +              {
 +                if (++i > numberOfMessagesToGenerate)
 +                  return;
 +
++                Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
++                if (logicErrors.contains(i))
++                {
++                  value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
++                }
++                if (poisonPills.contains(i))
++                {
++                  value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
++                }
 +
 +                ProducerRecord<Bytes, Bytes> record =
 +                    new ProducerRecord<>(
 +                        TOPIC,
 +                        partition,
 +                        new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
 +                        value);
 +
 +                messageSender.accept(record);
 +              }
 +            }
 +          }
 +        });
 +  }
++
++
++  @TestConfiguration
++  public static class Configuration
++  {
++    @Primary
++    @Bean
++    public Consumer<ConsumerRecord<String, Long>> consumer()
++    {
++      return (record) ->
++      {
++        if (record.value() == Long.MIN_VALUE)
++          throw new RuntimeException("BOOM (Logic-Error)!");
++      };
++    }
++  }
 +}
index 68f150f,0000000..a6d6aa1
mode 100644,000000..100644
--- /dev/null
@@@ -1,309 -1,0 +1,344 @@@
-       Consumer<ConsumerRecord<K, V>> testHandler;
 +package de.juplo.kafka;
 +
 +import lombok.extern.slf4j.Slf4j;
 +import org.apache.kafka.clients.consumer.ConsumerRecord;
 +import org.apache.kafka.clients.consumer.KafkaConsumer;
 +import org.apache.kafka.clients.producer.KafkaProducer;
 +import org.apache.kafka.clients.producer.ProducerRecord;
 +import org.apache.kafka.common.TopicPartition;
 +import org.apache.kafka.common.errors.RecordDeserializationException;
 +import org.apache.kafka.common.serialization.*;
 +import org.apache.kafka.common.utils.Bytes;
 +import org.junit.jupiter.api.*;
 +import org.springframework.beans.factory.annotation.Autowired;
 +import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
 +import org.springframework.boot.test.context.TestConfiguration;
 +import org.springframework.context.annotation.Import;
 +import org.springframework.kafka.test.context.EmbeddedKafka;
 +import org.springframework.test.context.TestPropertySource;
 +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
 +
 +import java.time.Duration;
 +import java.util.*;
 +import java.util.concurrent.ExecutorService;
 +import java.util.function.BiConsumer;
 +import java.util.function.Consumer;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +import static de.juplo.kafka.GenericApplicationTest.PARTITIONS;
 +import static de.juplo.kafka.GenericApplicationTest.TOPIC;
 +import static org.assertj.core.api.Assertions.*;
 +import static org.awaitility.Awaitility.*;
 +
 +
 +@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
 +@TestPropertySource(
 +              properties = {
 +                              "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
 +                              "consumer.topic=" + TOPIC,
 +                              "consumer.commit-interval=1s" })
 +@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 +@Slf4j
 +abstract class GenericApplicationTest<K, V>
 +{
 +      public static final String TOPIC = "FOO";
 +      public static final int PARTITIONS = 10;
 +
 +
 +      @Autowired
 +      KafkaConsumer<K, V> kafkaConsumer;
 +      @Autowired
++      Consumer<ConsumerRecord<K, V>> consumer;
++      @Autowired
 +      ApplicationProperties properties;
 +      @Autowired
 +      ExecutorService executor;
 +
 +      KafkaProducer<Bytes, Bytes> testRecordProducer;
 +      KafkaConsumer<Bytes, Bytes> offsetConsumer;
-               recordGenerator.generate(100, Set.of(), messageSender);
 +      EndlessConsumer<K, V> endlessConsumer;
 +      Map<TopicPartition, Long> oldOffsets;
 +      Map<TopicPartition, Long> newOffsets;
 +      Set<ConsumerRecord<K, V>> receivedRecords;
 +
 +
 +      final RecordGenerator recordGenerator;
 +      final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
 +
 +      public GenericApplicationTest(RecordGenerator recordGenerator)
 +      {
 +              this.recordGenerator = recordGenerator;
 +              this.messageSender = (record) -> sendMessage(record);
 +      }
 +
 +
 +      /** Tests methods */
 +
 +      @Test
 +      void commitsCurrentOffsetsOnSuccess()
 +      {
-               recordGenerator.generate(100, Set.of(77), messageSender);
++              recordGenerator.generate(100, Set.of(), Set.of(), messageSender);
 +
 +              await("100 records received")
 +                              .atMost(Duration.ofSeconds(30))
 +                              .pollInterval(Duration.ofSeconds(1))
 +                              .until(() -> receivedRecords.size() >= 100);
 +
 +              await("Offsets committed")
 +                              .atMost(Duration.ofSeconds(10))
 +                              .pollInterval(Duration.ofSeconds(1))
 +                              .untilAsserted(() ->
 +                              {
 +                                      checkSeenOffsetsForProgress();
 +                                      compareToCommitedOffsets(newOffsets);
 +                              });
 +
 +              assertThatExceptionOfType(IllegalStateException.class)
 +                              .isThrownBy(() -> endlessConsumer.exitStatus())
 +                              .describedAs("Consumer should still be running");
 +      }
 +
 +      @Test
 +      void commitsOffsetOfErrorForReprocessingOnDeserializationError()
 +      {
-                               Set<Integer> poistionPills,
++              recordGenerator.generate(100, Set.of(77), Set.of(), messageSender);
 +
 +              await("Consumer failed")
 +                              .atMost(Duration.ofSeconds(30))
 +                              .pollInterval(Duration.ofSeconds(1))
 +                              .until(() -> !endlessConsumer.running());
 +
 +              checkSeenOffsetsForProgress();
 +              compareToCommitedOffsets(newOffsets);
 +
 +              endlessConsumer.start();
 +              await("Consumer failed")
 +                              .atMost(Duration.ofSeconds(30))
 +                              .pollInterval(Duration.ofSeconds(1))
 +                              .until(() -> !endlessConsumer.running());
 +
 +              checkSeenOffsetsForProgress();
 +              compareToCommitedOffsets(newOffsets);
 +              assertThat(receivedRecords.size())
 +                              .describedAs("Received not all sent events")
 +                              .isLessThan(100);
 +
 +              assertThatNoException()
 +                              .describedAs("Consumer should not be running")
 +                              .isThrownBy(() -> endlessConsumer.exitStatus());
 +              assertThat(endlessConsumer.exitStatus())
 +                              .describedAs("Consumer should have exited abnormally")
 +                              .containsInstanceOf(RecordDeserializationException.class);
 +      }
 +
++      @Test
++      void doesNotCommitOffsetsOnLogicError()
++      {
++              recordGenerator.generate(100, Set.of(), Set.of(77), messageSender);
++
++              await("Consumer failed")
++                              .atMost(Duration.ofSeconds(30))
++                              .pollInterval(Duration.ofSeconds(1))
++                              .until(() -> !endlessConsumer.running());
++
++              checkSeenOffsetsForProgress();
++              compareToCommitedOffsets(oldOffsets);
++
++              endlessConsumer.start();
++              await("Consumer failed")
++                              .atMost(Duration.ofSeconds(30))
++                              .pollInterval(Duration.ofSeconds(1))
++                              .until(() -> !endlessConsumer.running());
++
++              checkSeenOffsetsForProgress();
++              compareToCommitedOffsets(oldOffsets);
++              assertThat(receivedRecords.size())
++                              .describedAs("Received not all sent events")
++                              .isLessThan(100);
++
++              assertThatNoException()
++                              .describedAs("Consumer should not be running")
++                              .isThrownBy(() -> endlessConsumer.exitStatus());
++              assertThat(endlessConsumer.exitStatus())
++                              .describedAs("Consumer should have exited abnormally")
++                              .containsInstanceOf(RuntimeException.class);
++      }
++
 +
 +      /** Helper methods for the verification of expectations */
 +
 +      void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
 +      {
 +              doForCurrentOffsets((tp, offset) ->
 +              {
 +                      Long expected = offsetsToCheck.get(tp) + 1;
 +                      log.debug("Checking, if the offset for {} is {}", tp, expected);
 +                      assertThat(offset)
 +                                      .describedAs("Committed offset corresponds to the offset of the consumer")
 +                                      .isEqualTo(expected);
 +              });
 +      }
 +
 +      void checkSeenOffsetsForProgress()
 +      {
 +              // Be sure, that some messages were consumed...!
 +              Set<TopicPartition> withProgress = new HashSet<>();
 +              partitions().forEach(tp ->
 +              {
 +                      Long oldOffset = oldOffsets.get(tp) + 1;
 +                      Long newOffset = newOffsets.get(tp) + 1;
 +                      if (!oldOffset.equals(newOffset))
 +                      {
 +                              log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
 +                              withProgress.add(tp);
 +                      }
 +              });
 +              assertThat(withProgress)
 +                              .describedAs("Some offsets must have changed, compared to the old offset-positions")
 +                              .isNotEmpty();
 +      }
 +
 +
 +      /** Helper methods for setting up and running the tests */
 +
 +      void seekToEnd()
 +      {
 +              offsetConsumer.assign(partitions());
 +              offsetConsumer.seekToEnd(partitions());
 +              partitions().forEach(tp ->
 +              {
 +                      // seekToEnd() works lazily: it only takes effect on poll()/position()
 +                      Long offset = offsetConsumer.position(tp);
 +                      log.info("New position for {}: {}", tp, offset);
 +              });
 +              // The new positions must be commited!
 +              offsetConsumer.commitSync();
 +              offsetConsumer.unsubscribe();
 +      }
 +
 +      void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
 +      {
 +              offsetConsumer.assign(partitions());
 +              partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
 +              offsetConsumer.unsubscribe();
 +      }
 +
 +      List<TopicPartition> partitions()
 +      {
 +              return
 +                              IntStream
 +                                              .range(0, PARTITIONS)
 +                                              .mapToObj(partition -> new TopicPartition(TOPIC, partition))
 +                                              .collect(Collectors.toList());
 +      }
 +
 +
 +      public interface RecordGenerator
 +      {
 +              void generate(
 +                              int numberOfMessagesToGenerate,
-               testHandler = record -> {} ;
++                              Set<Integer> poisonPills,
++                              Set<Integer> logicErrors,
 +                              Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
 +      }
 +
 +      void sendMessage(ProducerRecord<Bytes, Bytes> record)
 +      {
 +              testRecordProducer.send(record, (metadata, e) ->
 +              {
 +                      if (metadata != null)
 +                      {
 +                              log.debug(
 +                                              "{}|{} - {}={}",
 +                                              metadata.partition(),
 +                                              metadata.offset(),
 +                                              record.key(),
 +                                              record.value());
 +                      }
 +                      else
 +                      {
 +                              log.warn(
 +                                              "Exception for {}={}: {}",
 +                                              record.key(),
 +                                              record.value(),
 +                                              e.toString());
 +                      }
 +              });
 +      }
 +
 +
 +      @BeforeEach
 +      public void init()
 +      {
 +              Properties props;
 +              props = new Properties();
 +              props.put("bootstrap.servers", properties.getBootstrapServer());
 +              props.put("linger.ms", 100);
 +              props.put("key.serializer", BytesSerializer.class.getName());
 +              props.put("value.serializer", BytesSerializer.class.getName());
 +              testRecordProducer = new KafkaProducer<>(props);
 +
 +              props = new Properties();
 +              props.put("bootstrap.servers", properties.getBootstrapServer());
 +              props.put("client.id", "OFFSET-CONSUMER");
 +              props.put("group.id", properties.getGroupId());
 +              props.put("key.deserializer", BytesDeserializer.class.getName());
 +              props.put("value.deserializer", BytesDeserializer.class.getName());
 +              offsetConsumer = new KafkaConsumer<>(props);
 +
-                                       testHandler.accept(record);
 +              seekToEnd();
 +
 +              oldOffsets = new HashMap<>();
 +              newOffsets = new HashMap<>();
 +              receivedRecords = new HashSet<>();
 +
 +              doForCurrentOffsets((tp, offset) ->
 +              {
 +                      oldOffsets.put(tp, offset - 1);
 +                      newOffsets.put(tp, offset - 1);
 +              });
 +
 +              Consumer<ConsumerRecord<K, V>> captureOffsetAndExecuteTestHandler =
 +                              record ->
 +                              {
 +                                      newOffsets.put(
 +                                                      new TopicPartition(record.topic(), record.partition()),
 +                                                      record.offset());
 +                                      receivedRecords.add(record);
-       public static class Configuration {}
++                                      consumer.accept(record);
 +                              };
 +
 +              endlessConsumer =
 +                              new EndlessConsumer<>(
 +                                              executor,
 +                                              properties.getClientId(),
 +                                              properties.getTopic(),
 +                                              kafkaConsumer,
 +                                              captureOffsetAndExecuteTestHandler);
 +
 +              endlessConsumer.start();
 +      }
 +
 +      @AfterEach
 +      public void deinit()
 +      {
 +              try
 +              {
 +                      endlessConsumer.stop();
 +                      testRecordProducer.close();
 +                      offsetConsumer.close();
 +              }
 +              catch (Exception e)
 +              {
 +                      log.info("Exception while stopping the consumer: {}", e.toString());
 +              }
 +      }
 +
 +
 +      @TestConfiguration
 +      @Import(ApplicationConfiguration.class)
++      public static class Configuration
++      {
++      }
 +}