Verbesserte Tests und Korrekturen gemerged: stored-offsets -> stored-state stored-state
authorKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 19:23:49 +0000 (21:23 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 19:23:49 +0000 (21:23 +0200)
1  2 
README.sh
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/StateDocument.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

diff --cc README.sh
+++ b/README.sh
@@@ -24,65 -25,17 +25,65 @@@ f
  
  echo "Waiting for the Kafka-Cluster to become ready..."
  docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 -docker-compose up setup
 -docker-compose up -d producer peter beate
 +docker-compose up -d kafka-ui
  
 -sleep 15
 +docker-compose exec -T cli bash << 'EOF'
 +echo "Creating topic with 3 partitions..."
 +kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
 +# tag::createtopic[]
 +kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
 +# end::createtopic[]
 +kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 +EOF
  
 -http -v post :8082/stop
 +docker-compose up -d consumer
 +
 +docker-compose up -d producer
  sleep 10
- http -v :8081/seen
 -docker-compose kill -s 9 peter
 -http -v post :8082/start
 -sleep 60
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +
 +docker-compose stop producer
 +docker-compose exec -T cli bash << 'EOF'
 +echo "Altering number of partitions from 3 to 7..."
 +# tag::altertopic[]
 +kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
 +kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 +# end::altertopic[]
 +EOF
  
 -docker-compose stop producer peter beate
 -docker-compose logs beate
 -docker-compose logs --tail=10 peter
 +docker-compose start producer
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +sleep 1
- http -v :8081/seen
++http -v :8081/state
 +docker-compose stop producer consumer
@@@ -24,17 -25,20 +24,17 @@@ public class ApplicationConfiguratio
    }
  
    @Bean
-   public KeyCountingRebalanceListener keyCountingRebalanceListener(
-       KeyCountingRecordHandler keyCountingRecordHandler,
-       PartitionStatisticsRepository repository,
+   public ApplicationRebalanceListener rebalanceListener(
+       ApplicationRecordHandler recordHandler,
+       StateRepository stateRepository,
 -      Consumer<String, Long> consumer,
        ApplicationProperties properties)
    {
-     return new KeyCountingRebalanceListener(
-         keyCountingRecordHandler,
-         repository,
+     return new ApplicationRebalanceListener(
+         recordHandler,
+         stateRepository,
          properties.getClientId(),
 -        properties.getTopic(),
          Clock.systemDefaultZone(),
 -        properties.getCommitInterval(),
 -        consumer);
 +        properties.getCommitInterval());
    }
  
    @Bean
index 0000000,444b7b7..247b6f7
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,108 +1,76 @@@
 -import org.apache.kafka.clients.consumer.Consumer;
+ package de.juplo.kafka;
+ import lombok.RequiredArgsConstructor;
+ import lombok.extern.slf4j.Slf4j;
 -  private final String topic;
+ import org.apache.kafka.common.TopicPartition;
+ import java.time.Clock;
+ import java.time.Duration;
+ import java.time.Instant;
+ import java.util.Collection;
+ import java.util.Map;
+ @RequiredArgsConstructor
+ @Slf4j
+ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+ {
+   private final ApplicationRecordHandler recordHandler;
+   private final StateRepository stateRepository;
+   private final String id;
 -  private final Consumer<String, Long> consumer;
+   private final Clock clock;
+   private final Duration commitInterval;
 -  private boolean commitsEnabled = true;
+   private Instant lastCommit = Instant.EPOCH;
 -      log.info("{} - adding partition: {}, offset={}", id, partition, document.offset);
 -      if (document.offset >= 0)
 -      {
 -        // Only seek, if a stored offset was found
 -        // Otherwise: Use initial offset, generated by Kafka
 -        consumer.seek(tp, document.offset);
 -      }
+   @Override
+   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+   {
+     partitions.forEach(tp ->
+     {
+       Integer partition = tp.partition();
++      log.info("{} - adding partition: {}", id, partition);
+       StateDocument document =
+           stateRepository
+               .findById(Integer.toString(partition))
+               .orElse(new StateDocument(partition));
 -      Long offset = consumer.position(tp);
 -      log.info(
 -          "{} - removing partition: {}, offset of next message {})",
 -          id,
 -          partition,
 -          offset);
 -      if (commitsEnabled)
 -      {
 -        Map<String, Long> removed = recordHandler.removePartition(partition);
 -        stateRepository.save(new StateDocument(partition, removed, offset));
 -      }
 -      else
+       recordHandler.addPartition(partition, document.state);
+     });
+   }
+   @Override
+   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+   {
+     partitions.forEach(tp ->
+     {
+       Integer partition = tp.partition();
 -        log.info("Offset commits are disabled! Last commit: {}", lastCommit);
++      log.info("{} - removing partition: {}", id, partition);
++      Map<String, Long> removed = recordHandler.removePartition(partition);
++      for (String key : removed.keySet())
+       {
 -    if (!commitsEnabled)
 -    {
 -      log.info("Offset commits are disabled! Last commit: {}", lastCommit);
 -      return;
 -    }
 -
++        log.info(
++            "{} - Seen {} messages for partition={}|key={}",
++            id,
++            removed.get(key),
++            partition,
++            key);
+       }
++      stateRepository.save(new StateDocument(partition, removed));
+     });
+   }
+   @Override
+   public void beforeNextPoll()
+   {
 -      log.debug("Storing data and offsets, last commit: {}", lastCommit);
 -      recordHandler.getState().forEach((partiton, state) -> stateRepository.save(
+     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+     {
 -              state,
 -              consumer.position(new TopicPartition(topic, partiton)))));
++      log.debug("Storing data, last commit: {}", lastCommit);
++      recordHandler.getState().forEach((partiton, statistics) -> stateRepository.save(
+           new StateDocument(
+               partiton,
 -
 -  @Override
 -  public void enableCommits()
 -  {
 -    commitsEnabled = true;
 -  }
 -
 -  @Override
 -  public void disableCommits()
 -  {
 -    commitsEnabled = false;
 -  }
++              statistics)));
+       lastCommit = clock.instant();
+     }
+   }
+ }
@@@ -42,7 -42,8 +42,7 @@@ public class EndlessConsumer<K, V> impl
      try
      {
        log.info("{} - Subscribing to topic {}", id, topic);
-       consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
 -      rebalanceListener.enableCommits();
+       consumer.subscribe(Arrays.asList(topic), rebalanceListener);
  
        while (true)
        {
index 0000000,bb1c701..b37b8a9
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,36 +1,34 @@@
 -  public long offset = -1l;
+ package de.juplo.kafka;
+ import lombok.ToString;
+ import org.springframework.data.annotation.Id;
+ import org.springframework.data.mongodb.core.mapping.Document;
+ import java.util.HashMap;
+ import java.util.Map;
+ @Document(collection = "state")
+ @ToString
+ public class StateDocument
+ {
+   @Id
+   public String id;
 -  public StateDocument(Integer partition, Map<String, Long> state, long offset)
+   public Map<String, Long> state;
+   public StateDocument()
+   {
+   }
+   public StateDocument(Integer partition)
+   {
+     this.id = Integer.toString(partition);
+     this.state = new HashMap<>();
+   }
 -    this.offset = offset;
++  public StateDocument(Integer partition, Map<String, Long> state)
+   {
+     this.id = Integer.toString(partition);
+     this.state = state;
+   }
+ }
index 0000000,fa3d911..a64ebd0
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,390 +1,382 @@@
 -      StateRepository stateRepository;
 -      @Autowired
+ package de.juplo.kafka;
+ import lombok.extern.slf4j.Slf4j;
+ import org.apache.kafka.clients.consumer.ConsumerRecord;
+ import org.apache.kafka.clients.consumer.KafkaConsumer;
+ import org.apache.kafka.clients.producer.KafkaProducer;
+ import org.apache.kafka.clients.producer.ProducerRecord;
+ import org.apache.kafka.common.TopicPartition;
+ import org.apache.kafka.common.errors.RecordDeserializationException;
+ import org.apache.kafka.common.serialization.*;
+ import org.apache.kafka.common.utils.Bytes;
+ import org.junit.jupiter.api.*;
+ import org.springframework.beans.factory.annotation.Autowired;
+ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+ import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
+ import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
+ import org.springframework.boot.test.context.TestConfiguration;
+ import org.springframework.context.annotation.Import;
+ import org.springframework.kafka.test.context.EmbeddedKafka;
+ import org.springframework.test.context.TestPropertySource;
+ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+ import java.time.Duration;
+ import java.util.*;
+ import java.util.concurrent.ExecutorService;
+ import java.util.function.BiConsumer;
+ import java.util.function.Consumer;
+ import java.util.stream.Collectors;
+ import java.util.stream.IntStream;
+ import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
+ import static de.juplo.kafka.GenericApplicationTests.TOPIC;
+ import static org.assertj.core.api.Assertions.*;
+ import static org.awaitility.Awaitility.*;
+ @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
+ @TestPropertySource(
+               properties = {
+                               "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "consumer.topic=" + TOPIC,
+                               "consumer.commit-interval=1s",
+                               "spring.mongodb.embedded.version=4.4.13" })
+ @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+ @EnableAutoConfiguration
+ @AutoConfigureDataMongo
+ @Slf4j
+ abstract class GenericApplicationTests<K, V>
+ {
+       public static final String TOPIC = "FOO";
+       public static final int PARTITIONS = 10;
+       @Autowired
+       KafkaConsumer<K, V> kafkaConsumer;
+       @Autowired
+       Consumer<ConsumerRecord<K, V>> consumer;
+       @Autowired
+       ApplicationProperties properties;
+       @Autowired
+       ExecutorService executor;
+       @Autowired
 -                      Integer partition = tp.partition();
 -                      StateDocument document =
 -                                      stateRepository
 -                                                      .findById(partition.toString())
 -                                                      .orElse(new StateDocument(partition));
 -                      document.offset = offset;
 -                      stateRepository.save(document);
+       PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+       @Autowired
+       RecordHandler<K, V> recordHandler;
+       KafkaProducer<Bytes, Bytes> testRecordProducer;
+       KafkaConsumer<Bytes, Bytes> offsetConsumer;
+       EndlessConsumer<K, V> endlessConsumer;
+       Map<TopicPartition, Long> oldOffsets;
+       Map<TopicPartition, Long> newOffsets;
+       Set<ConsumerRecord<K, V>> receivedRecords;
+       final RecordGenerator recordGenerator;
+       final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
+       public GenericApplicationTests(RecordGenerator recordGenerator)
+       {
+               this.recordGenerator = recordGenerator;
+               this.messageSender = (record) -> sendMessage(record);
+       }
+       /** Tests methods */
+       @Test
+       void commitsCurrentOffsetsOnSuccess()
+       {
+               int numberOfGeneratedMessages =
+                               recordGenerator.generate(false, false, messageSender);
+               await(numberOfGeneratedMessages + " records received")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
+               await("Offsets committed")
+                               .atMost(Duration.ofSeconds(10))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .untilAsserted(() ->
+                               {
+                                       checkSeenOffsetsForProgress();
+                                       compareToCommitedOffsets(newOffsets);
+                               });
+               assertThatExceptionOfType(IllegalStateException.class)
+                               .isThrownBy(() -> endlessConsumer.exitStatus())
+                               .describedAs("Consumer should still be running");
+               recordGenerator.assertBusinessLogic();
+       }
+       @Test
+       @SkipWhenErrorCannotBeGenerated(poisonPill = true)
+       void commitsOffsetOfErrorForReprocessingOnDeserializationError()
+       {
+               int numberOfGeneratedMessages =
+                               recordGenerator.generate(true, false, messageSender);
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(newOffsets);
+               endlessConsumer.start();
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(newOffsets);
+               assertThat(receivedRecords.size())
+                               .describedAs("Received not all sent events")
+                               .isLessThan(numberOfGeneratedMessages);
+               assertThatNoException()
+                               .describedAs("Consumer should not be running")
+                               .isThrownBy(() -> endlessConsumer.exitStatus());
+               assertThat(endlessConsumer.exitStatus())
+                               .describedAs("Consumer should have exited abnormally")
+                               .containsInstanceOf(RecordDeserializationException.class);
+               recordGenerator.assertBusinessLogic();
+       }
+       @Test
+       @SkipWhenErrorCannotBeGenerated(logicError = true)
+       void doesNotCommitOffsetsOnLogicError()
+       {
+               int numberOfGeneratedMessages =
+                               recordGenerator.generate(false, true, messageSender);
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(oldOffsets);
+               endlessConsumer.start();
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(oldOffsets);
+               assertThat(receivedRecords.size())
+                               .describedAs("Received not all sent events")
+                               .isLessThan(numberOfGeneratedMessages);
+               assertThatNoException()
+                               .describedAs("Consumer should not be running")
+                               .isThrownBy(() -> endlessConsumer.exitStatus());
+               assertThat(endlessConsumer.exitStatus())
+                               .describedAs("Consumer should have exited abnormally")
+                               .containsInstanceOf(RuntimeException.class);
+               recordGenerator.assertBusinessLogic();
+       }
+       /** Helper methods for the verification of expectations */
+       void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
+       {
+               doForCurrentOffsets((tp, offset) ->
+               {
+                       Long expected = offsetsToCheck.get(tp) + 1;
+                       log.debug("Checking, if the offset for {} is {}", tp, expected);
+                       assertThat(offset)
+                                       .describedAs("Committed offset corresponds to the offset of the consumer")
+                                       .isEqualTo(expected);
+               });
+       }
+       void checkSeenOffsetsForProgress()
+       {
+               // Be sure, that some messages were consumed...!
+               Set<TopicPartition> withProgress = new HashSet<>();
+               partitions().forEach(tp ->
+               {
+                       Long oldOffset = oldOffsets.get(tp) + 1;
+                       Long newOffset = newOffsets.get(tp) + 1;
+                       if (!oldOffset.equals(newOffset))
+                       {
+                               log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
+                               withProgress.add(tp);
+                       }
+               });
+               assertThat(withProgress)
+                               .describedAs("Some offsets must have changed, compared to the old offset-positions")
+                               .isNotEmpty();
+       }
+       /** Helper methods for setting up and running the tests */
+       void seekToEnd()
+       {
+               offsetConsumer.assign(partitions());
++              offsetConsumer.seekToEnd(partitions());
+               partitions().forEach(tp ->
+               {
++                      // seekToEnd() works lazily: it only takes effect on poll()/position()
+                       Long offset = offsetConsumer.position(tp);
+                       log.info("New position for {}: {}", tp, offset);
 -              partitions().forEach(tp ->
 -              {
 -                      String partition = Integer.toString(tp.partition());
 -                      Optional<Long> offset = stateRepository.findById(partition).map(document -> document.offset);
 -                      consumer.accept(tp, offset.orElse(0l));
 -              });
+               });
++              // The new positions must be commited!
++              offsetConsumer.commitSync();
+               offsetConsumer.unsubscribe();
+       }
+       void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
+       {
++              offsetConsumer.assign(partitions());
++              partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
++              offsetConsumer.unsubscribe();
+       }
+       List<TopicPartition> partitions()
+       {
+               return
+                               IntStream
+                                               .range(0, PARTITIONS)
+                                               .mapToObj(partition -> new TopicPartition(TOPIC, partition))
+                                               .collect(Collectors.toList());
+       }
+       public interface RecordGenerator
+       {
+               int generate(
+                               boolean poisonPills,
+                               boolean logicErrors,
+                               Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
+               default boolean canGeneratePoisonPill()
+               {
+                       return true;
+               }
+               default boolean canGenerateLogicError()
+               {
+                       return true;
+               }
+               default void assertBusinessLogic()
+               {
+                       log.debug("No business-logic to assert");
+               }
+       }
+       void sendMessage(ProducerRecord<Bytes, Bytes> record)
+       {
+               testRecordProducer.send(record, (metadata, e) ->
+               {
+                       if (metadata != null)
+                       {
+                               log.debug(
+                                               "{}|{} - {}={}",
+                                               metadata.partition(),
+                                               metadata.offset(),
+                                               record.key(),
+                                               record.value());
+                       }
+                       else
+                       {
+                               log.warn(
+                                               "Exception for {}={}: {}",
+                                               record.key(),
+                                               record.value(),
+                                               e.toString());
+                       }
+               });
+       }
+       @BeforeEach
+       public void init()
+       {
+               Properties props;
+               props = new Properties();
+               props.put("bootstrap.servers", properties.getBootstrapServer());
+               props.put("linger.ms", 100);
+               props.put("key.serializer", BytesSerializer.class.getName());
+               props.put("value.serializer", BytesSerializer.class.getName());
+               testRecordProducer = new KafkaProducer<>(props);
+               props = new Properties();
+               props.put("bootstrap.servers", properties.getBootstrapServer());
+               props.put("client.id", "OFFSET-CONSUMER");
+               props.put("group.id", properties.getGroupId());
+               props.put("key.deserializer", BytesDeserializer.class.getName());
+               props.put("value.deserializer", BytesDeserializer.class.getName());
+               offsetConsumer = new KafkaConsumer<>(props);
+               seekToEnd();
+               oldOffsets = new HashMap<>();
+               newOffsets = new HashMap<>();
+               receivedRecords = new HashSet<>();
+               doForCurrentOffsets((tp, offset) ->
+               {
+                       oldOffsets.put(tp, offset - 1);
+                       newOffsets.put(tp, offset - 1);
+               });
+               TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
+                               new TestRecordHandler<K, V>(recordHandler)
+                               {
+                                       @Override
+                                       public void onNewRecord(ConsumerRecord<K, V> record)
+                                       {
+                                               newOffsets.put(
+                                                               new TopicPartition(record.topic(), record.partition()),
+                                                               record.offset());
+                                               receivedRecords.add(record);
+                                       }
+                               };
+               endlessConsumer =
+                               new EndlessConsumer<>(
+                                               executor,
+                                               properties.getClientId(),
+                                               properties.getTopic(),
+                                               kafkaConsumer,
+                                               rebalanceListener,
+                                               captureOffsetAndExecuteTestHandler);
+               endlessConsumer.start();
+       }
+       @AfterEach
+       public void deinit()
+       {
+               try
+               {
+                       endlessConsumer.stop();
+                       testRecordProducer.close();
+                       offsetConsumer.close();
+               }
+               catch (Exception e)
+               {
+                       log.info("Exception while stopping the consumer: {}", e.toString());
+               }
+       }
+       @TestConfiguration
+       @Import(ApplicationConfiguration.class)
+       public static class Configuration
+       {
+       }
+ }