Verbesserungen des Testfalls gemerged (Branch 'deserialization')
authorKai Moritz <kai@juplo.de>
Tue, 26 Jul 2022 14:11:45 +0000 (16:11 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 26 Jul 2022 14:11:45 +0000 (16:11 +0200)
1  2 
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/test/java/de/juplo/kafka/ApplicationTests.java

@@@ -2,11 -2,11 +2,11 @@@ package de.juplo.kafka
  
  import org.apache.kafka.clients.consumer.ConsumerRecord;
  import org.apache.kafka.clients.consumer.KafkaConsumer;
 -import org.apache.kafka.common.serialization.LongDeserializer;
  import org.apache.kafka.common.serialization.StringDeserializer;
  import org.springframework.boot.context.properties.EnableConfigurationProperties;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Configuration;
 +import org.springframework.kafka.support.serializer.JsonDeserializer;
  
  import java.util.Properties;
  import java.util.concurrent.ExecutorService;
@@@ -19,7 -19,7 +19,7 @@@ import java.util.function.Consumer
  public class ApplicationConfiguration
  {
    @Bean
 -  public Consumer<ConsumerRecord<String, Long>> consumer()
 +  public Consumer<ConsumerRecord<String, ValidMessage>> consumer()
    {
      return (record) ->
      {
    }
  
    @Bean
 -  public EndlessConsumer<String, Long> endlessConsumer(
 -      KafkaConsumer<String, Long> kafkaConsumer,
 +  public EndlessConsumer<String, ValidMessage> endlessConsumer(
 +      KafkaConsumer<String, ValidMessage> kafkaConsumer,
        ExecutorService executor,
 -      Consumer<ConsumerRecord<String, Long>> handler,
 +      Consumer<ConsumerRecord<String, ValidMessage>> handler,
        ApplicationProperties properties)
    {
      return
@@@ -50,7 -50,7 +50,7 @@@
    }
  
    @Bean(destroyMethod = "close")
 -  public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
 +  public KafkaConsumer<String, ValidMessage> kafkaConsumer(ApplicationProperties properties)
    {
      Properties props = new Properties();
  
      props.put("group.id", properties.getGroupId());
      props.put("client.id", properties.getClientId());
      props.put("auto.offset.reset", properties.getAutoOffsetReset());
+     props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
      props.put("metadata.max.age.ms", "1000");
      props.put("key.deserializer", StringDeserializer.class.getName());
 -    props.put("value.deserializer", LongDeserializer.class.getName());
 +    props.put("value.deserializer", JsonDeserializer.class.getName());
 +    props.put(JsonDeserializer.TYPE_MAPPINGS,
 +        "message:" + ClientMessage.class.getName() + "," +
 +        "greeting:" + Greeting.class.getName());
 +    props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka");
  
      return new KafkaConsumer<>(props);
    }
@@@ -15,13 -15,11 +15,13 @@@ import org.springframework.boot.test.co
  import org.springframework.boot.test.context.TestConfiguration;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Import;
 +import org.springframework.kafka.support.serializer.JsonSerializer;
  import org.springframework.kafka.test.context.EmbeddedKafka;
  import org.springframework.test.context.TestPropertySource;
  import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
  
  import java.time.Duration;
 +import java.time.LocalDateTime;
  import java.util.*;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ExecutorService;
@@@ -41,7 -39,8 +41,8 @@@ import static org.awaitility.Awaitility
  @TestPropertySource(
                properties = {
                                "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "consumer.topic=" + TOPIC })
+                               "consumer.topic=" + TOPIC,
+                               "consumer.commit-interval=1s" })
  @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
  @Slf4j
  class ApplicationTests
@@@ -57,7 -56,7 +58,7 @@@
        @Autowired
        KafkaProducer<String, Bytes> kafkaProducer;
        @Autowired
 -      KafkaConsumer<String, Long> kafkaConsumer;
 +      KafkaConsumer<String, ValidMessage> kafkaConsumer;
        @Autowired
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
        @Autowired
        @Autowired
        ExecutorService executor;
  
 -      Consumer<ConsumerRecord<String, Long>> testHandler;
 -      EndlessConsumer<String, Long> endlessConsumer;
 +      Consumer<ConsumerRecord<String, ValidMessage>> testHandler;
 +      EndlessConsumer<String, ValidMessage> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
 -      Set<ConsumerRecord<String, Long>> receivedRecords;
 +      Set<ConsumerRecord<String, ValidMessage>> receivedRecords;
  
  
        /** Tests methods */
  
        @Test
-       @Order(1) // << The poistion pill is not skipped. Hence, this test must run first
        void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
        {
                send100Messages((partition, key, counter) ->
                {
 -                      Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter));
 -                      return new ProducerRecord<>(TOPIC, partition, key, value);
 +                      Bytes value;
 +                      String type;
 +
 +                      if (counter%3 != 0)
 +                      {
 +                              value = serializeClientMessage(key, counter);
 +                              type = "message";
 +                      }
 +                      else {
 +                              value = serializeGreeting(key, counter);
 +                              type = "greeting";
 +                      }
 +
 +                      return toRecord(partition, key, value, type);
                });
  
                await("100 records received")
                                .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
                                .until(() -> receivedRecords.size() >= 100);
  
                await("Offsets committed")
                                .atMost(Duration.ofSeconds(10))
+                               .pollInterval(Duration.ofSeconds(1))
                                .untilAsserted(() ->
                                {
                                        checkSeenOffsetsForProgress();
        }
  
        @Test
-       @Order(2)
-       void commitsOffsetOfErrorForReprocessingOnError()
+       void commitsOffsetOfErrorForReprocessingOnDeserializationError()
        {
                send100Messages((partition, key, counter) ->
                {
 -                      Bytes value = counter == 77
 -                                      ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
 -                                      : new Bytes(valueSerializer.serialize(TOPIC, counter));
 -                      return new ProducerRecord<>(TOPIC, partition, key, value);
 +                      Bytes value;
 +                      String type;
 +
 +                      if (counter == 77)
 +                      {
 +                              value = serializeFooMessage(key, counter);
 +                              type = "foo";
 +                      }
 +                      else
 +                      {
 +                              if (counter%3 != 0)
 +                              {
 +                                      value = serializeClientMessage(key, counter);
 +                                      type = "message";
 +                              }
 +                              else {
 +                                      value = serializeGreeting(key, counter);
 +                                      type = "greeting";
 +                              }
 +                      }
 +
 +                      return toRecord(partition, key, value, type);
                });
  
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
                                .until(() -> !endlessConsumer.running());
  
                checkSeenOffsetsForProgress();
                endlessConsumer.start();
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
                                .until(() -> !endlessConsumer.running());
  
                checkSeenOffsetsForProgress();
                Set<TopicPartition> withProgress = new HashSet<>();
                partitions().forEach(tp ->
                {
-                       Long oldOffset = oldOffsets.get(tp);
-                       Long newOffset = newOffsets.get(tp);
+                       Long oldOffset = oldOffsets.get(tp) + 1;
+                       Long newOffset = newOffsets.get(tp) + 1;
                        if (!oldOffset.equals(newOffset))
                        {
                                log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
  
        /** Helper methods for setting up and running the tests */
  
+       void seekToEnd()
+       {
+               offsetConsumer.assign(partitions());
+               offsetConsumer.seekToEnd(partitions());
+               partitions().forEach(tp ->
+               {
+                       // seekToEnd() works lazily: it only takes effect on poll()/position()
+                       Long offset = offsetConsumer.position(tp);
+                       log.info("New position for {}: {}", tp, offset);
+               });
+               // The new positions must be commited!
+               offsetConsumer.commitSync();
+               offsetConsumer.unsubscribe();
+       }
        void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
        {
                offsetConsumer.assign(partitions());
                }
        }
  
 +      ProducerRecord<String, Bytes> toRecord(int partition, String key, Bytes value, String type)
 +      {
 +              ProducerRecord<String, Bytes> record =
 +                              new ProducerRecord<>(TOPIC, partition, key, value);
 +              record.headers().add("__TypeId__", type.getBytes());
 +              return record;
 +      }
 +
 +      Bytes serializeClientMessage(String key, Long value)
 +      {
 +              TestClientMessage message = new TestClientMessage(key, value.toString());
 +              return new Bytes(valueSerializer.serialize(TOPIC, message));
 +      }
 +
 +      Bytes serializeGreeting(String key, Long value)
 +      {
 +              TestGreeting message = new TestGreeting(key, LocalDateTime.now());
 +              return new Bytes(valueSerializer.serialize(TOPIC, message));
 +      }
 +
 +      Bytes serializeFooMessage(String key, Long value)
 +      {
 +              TestFooMessage message = new TestFooMessage(key, value);
 +              return new Bytes(valueSerializer.serialize(TOPIC, message));
 +      }
  
        @BeforeEach
        public void init()
        {
                testHandler = record -> {} ;
  
+               seekToEnd();
                oldOffsets = new HashMap<>();
                newOffsets = new HashMap<>();
                receivedRecords = new HashSet<>();
                        newOffsets.put(tp, offset - 1);
                });
  
 -              Consumer<ConsumerRecord<String, Long>> captureOffsetAndExecuteTestHandler =
 +              Consumer<ConsumerRecord<String, ValidMessage>> captureOffsetAndExecuteTestHandler =
                                record ->
                                {
                                        newOffsets.put(
        public static class Configuration
        {
                @Bean
 -              Serializer<Long> serializer()
 +              Serializer<ValidMessage> serializer()
                {
 -                      return new LongSerializer();
 +                      return new JsonSerializer<>();
                }
  
                @Bean