Verbesserungen aus 'stored-offsets' nach 'stored-state' gemerged
authorKai Moritz <kai@juplo.de>
Sat, 13 Aug 2022 08:45:03 +0000 (10:45 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 13 Aug 2022 08:45:40 +0000 (10:45 +0200)
1  2 
docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java
src/main/java/de/juplo/kafka/StatisticsDocument.java
src/test/java/de/juplo/kafka/ApplicationIT.java
src/test/java/de/juplo/kafka/ApplicationTests.java

@@@ -40,14 -40,16 +40,16 @@@ services
        ME_CONFIG_MONGODB_ADMINUSERNAME: juplo
        ME_CONFIG_MONGODB_ADMINPASSWORD: training
        ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/
+     depends_on:
+       - mongo
  
 -  setup:
 -    image: juplo/toolbox
 -    command: >
 -      bash -c "
 -        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
 -        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
 -      "
 +  kafka-ui:
 +    image: provectuslabs/kafka-ui:0.3.3
 +    ports:
 +      - 8080:8080
 +    environment:
 +      KAFKA_CLUSTERS_0_NAME: local
 +      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
  
    cli:
      image: juplo/toolbox
@@@ -1,6 -1,6 +1,5 @@@
  package de.juplo.kafka;
  
- import org.apache.kafka.clients.consumer.ConsumerRecord;
 -import org.apache.kafka.clients.consumer.Consumer;
  import org.apache.kafka.clients.consumer.KafkaConsumer;
  import org.apache.kafka.common.serialization.LongDeserializer;
  import org.apache.kafka.common.serialization.StringDeserializer;
@@@ -19,12 -19,26 +18,23 @@@ import java.util.concurrent.Executors
  public class ApplicationConfiguration
  {
    @Bean
-   public Consumer<ConsumerRecord<String, Long>> consumer()
 -  public KeyCountingRecordHandler messageCountingRecordHandler()
++  public KeyCountingRecordHandler keyCountingRecordHandler()
    {
-     return (record) ->
-     {
-       // Handle record
-     };
+     return new KeyCountingRecordHandler();
+   }
+   @Bean
 -  public KeyCountingRebalanceListener wordcountRebalanceListener(
++  public KeyCountingRebalanceListener keyCountingRebalanceListener(
+       KeyCountingRecordHandler keyCountingRecordHandler,
+       PartitionStatisticsRepository repository,
 -      Consumer<String, Long> consumer,
+       ApplicationProperties properties)
+   {
+     return new KeyCountingRebalanceListener(
+         keyCountingRecordHandler,
+         repository,
+         properties.getClientId(),
 -        properties.getTopic(),
+         Clock.systemDefaultZone(),
 -        properties.getCommitInterval(),
 -        consumer);
++        properties.getCommitInterval());
    }
  
    @Bean
      Properties props = new Properties();
  
      props.put("bootstrap.servers", properties.getBootstrapServer());
+     props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
      props.put("group.id", properties.getGroupId());
      props.put("client.id", properties.getClientId());
 -    props.put("enable.auto.commit", false);
      props.put("auto.offset.reset", properties.getAutoOffsetReset());
+     props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
      props.put("metadata.max.age.ms", "1000");
      props.put("key.deserializer", StringDeserializer.class.getName());
      props.put("value.deserializer", LongDeserializer.class.getName());
index 0000000,4a2c036..636ff86
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,83 +1,76 @@@
 -import org.apache.kafka.clients.consumer.Consumer;
+ package de.juplo.kafka;
+ import lombok.RequiredArgsConstructor;
+ import lombok.extern.slf4j.Slf4j;
 -  private final String topic;
+ import org.apache.kafka.common.TopicPartition;
+ import java.time.Clock;
+ import java.time.Duration;
+ import java.time.Instant;
+ import java.util.Collection;
+ import java.util.Map;
+ @RequiredArgsConstructor
+ @Slf4j
+ public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+ {
+   private final KeyCountingRecordHandler handler;
+   private final PartitionStatisticsRepository repository;
+   private final String id;
 -  private final Consumer<String, Long> consumer;
+   private final Clock clock;
+   private final Duration commitInterval;
 -      Long offset = consumer.position(tp);
 -      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
+   private Instant lastCommit = Instant.EPOCH;
+   @Override
+   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+   {
+     partitions.forEach(tp ->
+     {
+       Integer partition = tp.partition();
 -      if (document.offset >= 0)
 -      {
 -        // Only seek, if a stored offset was found
 -        // Otherwise: Use initial offset, generated by Kafka
 -        consumer.seek(tp, document.offset);
 -      }
++      log.info("{} - adding partition: {}", id, partition);
+       StatisticsDocument document =
+           repository
+               .findById(Integer.toString(partition))
+               .orElse(new StatisticsDocument(partition));
 -      Long newOffset = consumer.position(tp);
 -      log.info(
 -          "{} - removing partition: {}, offset of next message {})",
 -          id,
 -          partition,
 -          newOffset);
+       handler.addPartition(partition, document.statistics);
+     });
+   }
+   @Override
+   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+   {
+     partitions.forEach(tp ->
+     {
+       Integer partition = tp.partition();
 -      repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
++      log.info("{} - removing partition: {}", id, partition);
+       Map<String, Long> removed = handler.removePartition(partition);
 -      log.debug("Storing data and offsets, last commit: {}", lastCommit);
++      for (String key : removed.keySet())
++      {
++        log.info(
++            "{} - Seen {} messages for partition={}|key={}",
++            id,
++            removed.get(key),
++            partition,
++            key);
++      }
++      repository.save(new StatisticsDocument(partition, removed));
+     });
+   }
+   @Override
+   public void beforeNextPoll()
+   {
+     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+     {
 -              statistics,
 -              consumer.position(new TopicPartition(topic, partiton)))));
++      log.debug("Storing data, last commit: {}", lastCommit);
+       handler.getSeen().forEach((partiton, statistics) -> repository.save(
+           new StatisticsDocument(
+               partiton,
++              statistics)));
+       lastCommit = clock.instant();
+     }
+   }
+ }
@@@ -20,7 -21,13 +20,13 @@@ public class StatisticsDocumen
    {
    }
  
 -  public StatisticsDocument(Integer partition, Map<String, Long> statistics, long offset)
+   public StatisticsDocument(Integer partition)
+   {
+     this.id = Integer.toString(partition);
+     this.statistics = new HashMap<>();
+   }
 +  public StatisticsDocument(Integer partition, Map<String, Long> statistics)
    {
      this.id = Integer.toString(partition);
      this.statistics = statistics;
index 0000000,cded0ee..d1d8e50
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,43 +1,43 @@@
 -  public void testApplicationStartup()
+ package de.juplo.kafka;
+ import org.junit.jupiter.api.Test;
+ import org.springframework.beans.factory.annotation.Autowired;
+ import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
+ import org.springframework.boot.test.context.SpringBootTest;
+ import org.springframework.boot.test.web.client.TestRestTemplate;
+ import org.springframework.boot.test.web.server.LocalServerPort;
+ import org.springframework.kafka.test.context.EmbeddedKafka;
+ import static de.juplo.kafka.ApplicationTests.TOPIC;
+ @SpringBootTest(
+     webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+     properties = {
+         "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+         "consumer.topic=" + TOPIC,
+         "spring.mongodb.embedded.version=4.4.13" })
+ @EmbeddedKafka(topics = TOPIC)
+ @AutoConfigureDataMongo
+ public class ApplicationIT
+ {
+   public static final String TOPIC = "FOO";
+   @LocalServerPort
+   private int port;
+   @Autowired
+   private TestRestTemplate restTemplate;
+   @Test
++  public void   testApplicationStartup()
+   {
+     restTemplate.getForObject(
+         "http://localhost:" + port + "/actuator/health",
+         String.class
+         )
+         .contains("UP");
+   }
+ }
@@@ -69,9 -70,12 +68,10 @@@ class ApplicationTest
        @Autowired
        ExecutorService executor;
        @Autowired
--      PartitionStatisticsRepository repository;
 -      @Autowired
+       KeyCountingRebalanceListener keyCountingRebalanceListener;
+       @Autowired
+       KeyCountingRecordHandler keyCountingRecordHandler;
  
-       Consumer<ConsumerRecord<String, Long>> testHandler;
        EndlessConsumer<String, Long> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
  
        /** Helper methods for setting up and running the tests */
  
 -                      Integer partition = tp.partition();
 -                      StatisticsDocument document =
 -                                      partitionStatisticsRepository
 -                                                      .findById(partition.toString())
 -                                                      .orElse(new StatisticsDocument(partition));
 -                      document.offset = offset;
 -                      partitionStatisticsRepository.save(document);
+       void seekToEnd()
+       {
+               offsetConsumer.assign(partitions());
++              offsetConsumer.seekToEnd(partitions());
+               partitions().forEach(tp ->
+               {
++                      // seekToEnd() works lazily: it only takes effect on poll()/position()
+                       Long offset = offsetConsumer.position(tp);
+                       log.info("New position for {}: {}", tp, offset);
+               });
++              // The new positions must be commited!
++              offsetConsumer.commitSync();
+               offsetConsumer.unsubscribe();
+       }
        void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
        {
 -              partitions().forEach(tp ->
 -              {
 -                      String partition = Integer.toString(tp.partition());
 -                      Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
 -                      consumer.accept(tp, offset.orElse(0l));
 -              });
 -              }
 +              offsetConsumer.assign(partitions());
 +              partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
 +              offsetConsumer.unsubscribe();
 +      }
  
        List<TopicPartition> partitions()
        {