Refaktorisierungen aus 'wordcount' nach 'stored-offsets' zurück portiert
authorKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 15:32:24 +0000 (17:32 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 15:35:17 +0000 (17:35 +0200)
1  2 
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java
src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java
src/test/java/de/juplo/kafka/ApplicationTests.java

@@@ -1,8 -1,7 +1,8 @@@
  package de.juplo.kafka;
  
- import org.apache.kafka.clients.consumer.ConsumerRecord;
+ import org.apache.kafka.clients.consumer.Consumer;
  import org.apache.kafka.clients.consumer.KafkaConsumer;
 +import org.apache.kafka.common.serialization.LongDeserializer;
  import org.apache.kafka.common.serialization.StringDeserializer;
  import org.springframework.boot.context.properties.EnableConfigurationProperties;
  import org.springframework.context.annotation.Bean;
@@@ -20,32 -18,44 +19,44 @@@ import java.util.concurrent.Executors
  public class ApplicationConfiguration
  {
    @Bean
-   public Consumer<ConsumerRecord<String, Long>> consumer()
 -  public WordcountRecordHandler wordcountRecordHandler()
++  public KeyCountingRecordHandler messageCountingRecordHandler()
    {
-     return (record) ->
-     {
-       // Handle record
-     };
 -    return new WordcountRecordHandler();
++    return new KeyCountingRecordHandler();
+   }
+   @Bean
 -  public WordcountRebalanceListener wordcountRebalanceListener(
 -      WordcountRecordHandler wordcountRecordHandler,
++  public KeyCountingRebalanceListener wordcountRebalanceListener(
++      KeyCountingRecordHandler keyCountingRecordHandler,
+       PartitionStatisticsRepository repository,
 -      Consumer<String, String> consumer,
++      Consumer<String, Long> consumer,
+       ApplicationProperties properties)
+   {
 -    return new WordcountRebalanceListener(
 -        wordcountRecordHandler,
++    return new KeyCountingRebalanceListener(
++        keyCountingRecordHandler,
+         repository,
+         properties.getClientId(),
+         properties.getTopic(),
+         Clock.systemDefaultZone(),
+         properties.getCommitInterval(),
+         consumer);
    }
  
    @Bean
 -  public EndlessConsumer<String, String> endlessConsumer(
 -      KafkaConsumer<String, String> kafkaConsumer,
 +  public EndlessConsumer<String, Long> endlessConsumer(
 +      KafkaConsumer<String, Long> kafkaConsumer,
        ExecutorService executor,
-       Consumer<ConsumerRecord<String, Long>> handler,
-       PartitionStatisticsRepository repository,
 -      WordcountRebalanceListener wordcountRebalanceListener,
 -      WordcountRecordHandler wordcountRecordHandler,
++      KeyCountingRebalanceListener keyCountingRebalanceListener,
++      KeyCountingRecordHandler keyCountingRecordHandler,
        ApplicationProperties properties)
    {
      return
          new EndlessConsumer<>(
              executor,
-             repository,
              properties.getClientId(),
              properties.getTopic(),
-             Clock.systemDefaultZone(),
-             properties.getCommitInterval(),
              kafkaConsumer,
-             handler);
 -            wordcountRebalanceListener,
 -            wordcountRecordHandler);
++            keyCountingRebalanceListener,
++            keyCountingRecordHandler);
    }
  
    @Bean
@@@ -2,11 -2,8 +2,7 @@@ package de.juplo.kafka
  
  import lombok.RequiredArgsConstructor;
  import org.springframework.http.HttpStatus;
- import org.springframework.web.bind.annotation.ExceptionHandler;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.ResponseStatus;
- import org.springframework.web.bind.annotation.RestController;
 -import org.springframework.http.ResponseEntity;
+ import org.springframework.web.bind.annotation.*;
  
  import java.util.Map;
  import java.util.concurrent.ExecutionException;
@@@ -17,6 -14,7 +13,7 @@@
  public class DriverController
  {
    private final EndlessConsumer consumer;
 -  private final WordcountRecordHandler wordcount;
++  private final KeyCountingRecordHandler keyCountingRecordHandler;
  
  
    @PostMapping("start")
  
  
    @GetMapping("seen")
 -  public Map<Integer, Map<String, Map<String, Long>>> seen()
 +  public Map<Integer, Map<String, Long>> seen()
    {
-     return consumer.getSeen();
 -    return wordcount.getSeen();
 -  }
 -
 -  @GetMapping("seen/{user}")
 -  public ResponseEntity<Map<String, Long>> seen(@PathVariable String user)
 -  {
 -    for (Map<String, Map<String, Long>> users : wordcount.getSeen().values())
 -    {
 -      Map<String, Long> words = users.get(user);
 -      if (words != null)
 -        return ResponseEntity.ok(words);
 -    }
 -
 -    return ResponseEntity.notFound().build();
++    return keyCountingRecordHandler.getSeen();
    }
  
  
index 0000000,0000000..4a2c036
new file mode 100644 (file)
--- /dev/null
--- /dev/null
@@@ -1,0 -1,0 +1,83 @@@
++package de.juplo.kafka;
++
++import lombok.RequiredArgsConstructor;
++import lombok.extern.slf4j.Slf4j;
++import org.apache.kafka.clients.consumer.Consumer;
++import org.apache.kafka.common.TopicPartition;
++
++import java.time.Clock;
++import java.time.Duration;
++import java.time.Instant;
++import java.util.Collection;
++import java.util.Map;
++
++
++@RequiredArgsConstructor
++@Slf4j
++public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
++{
++  private final KeyCountingRecordHandler handler;
++  private final PartitionStatisticsRepository repository;
++  private final String id;
++  private final String topic;
++  private final Clock clock;
++  private final Duration commitInterval;
++  private final Consumer<String, Long> consumer;
++
++  private Instant lastCommit = Instant.EPOCH;
++
++  @Override
++  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
++  {
++    partitions.forEach(tp ->
++    {
++      Integer partition = tp.partition();
++      Long offset = consumer.position(tp);
++      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
++      StatisticsDocument document =
++          repository
++              .findById(Integer.toString(partition))
++              .orElse(new StatisticsDocument(partition));
++      if (document.offset >= 0)
++      {
++        // Only seek, if a stored offset was found
++        // Otherwise: Use initial offset, generated by Kafka
++        consumer.seek(tp, document.offset);
++      }
++      handler.addPartition(partition, document.statistics);
++    });
++  }
++
++  @Override
++  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
++  {
++    partitions.forEach(tp ->
++    {
++      Integer partition = tp.partition();
++      Long newOffset = consumer.position(tp);
++      log.info(
++          "{} - removing partition: {}, offset of next message {})",
++          id,
++          partition,
++          newOffset);
++      Map<String, Long> removed = handler.removePartition(partition);
++      repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
++    });
++  }
++
++
++  @Override
++  public void beforeNextPoll()
++  {
++    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
++    {
++      log.debug("Storing data and offsets, last commit: {}", lastCommit);
++      handler.getSeen().forEach((partiton, statistics) -> repository.save(
++          new StatisticsDocument(
++              partiton,
++              statistics,
++              consumer.position(new TopicPartition(topic, partiton)))));
++      lastCommit = clock.instant();
++    }
++  }
++}
index 0000000,0000000..099dcf7
new file mode 100644 (file)
--- /dev/null
--- /dev/null
@@@ -1,0 -1,0 +1,46 @@@
++package de.juplo.kafka;
++
++import lombok.extern.slf4j.Slf4j;
++import org.apache.kafka.clients.consumer.ConsumerRecord;
++
++import java.util.HashMap;
++import java.util.Map;
++
++
++@Slf4j
++public class KeyCountingRecordHandler implements RecordHandler<String, Long>
++{
++  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
++
++
++  @Override
++  public void accept(ConsumerRecord<String, Long> record)
++  {
++    Integer partition = record.partition();
++    String key = record.key() == null ? "NULL" : record.key().toString();
++    Map<String, Long> byKey = seen.get(partition);
++
++    if (!byKey.containsKey(key))
++      byKey.put(key, 0l);
++
++    long seenByKey = byKey.get(key);
++    seenByKey++;
++    byKey.put(key, seenByKey);
++  }
++
++  public void addPartition(Integer partition, Map<String, Long> statistics)
++  {
++    seen.put(partition, statistics);
++  }
++
++  public Map<String, Long> removePartition(Integer partition)
++  {
++    return seen.remove(partition);
++  }
++
++
++  public Map<Integer, Map<String, Long>> getSeen()
++  {
++    return seen;
++  }
++}
@@@ -21,7 -20,7 +21,6 @@@ import org.springframework.kafka.test.c
  import org.springframework.test.context.TestPropertySource;
  import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
  
--import java.time.Clock;
  import java.time.Duration;
  import java.util.*;
  import java.util.concurrent.ExecutionException;
@@@ -72,12 -70,15 +70,15 @@@ class ApplicationTest
        ExecutorService executor;
        @Autowired
        PartitionStatisticsRepository repository;
 -      WordcountRebalanceListener wordcountRebalanceListener;
+       @Autowired
 -      WordcountRecordHandler wordcountRecordHandler;
++      KeyCountingRebalanceListener keyCountingRebalanceListener;
+       @Autowired
++      KeyCountingRecordHandler keyCountingRecordHandler;
  
-       Consumer<ConsumerRecord<String, Long>> testHandler;
 -      EndlessConsumer<String, String> endlessConsumer;
 +      EndlessConsumer<String, Long> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
 -      Set<ConsumerRecord<String, String>> receivedRecords;
 +      Set<ConsumerRecord<String, Long>> receivedRecords;
  
  
        /** Tests methods */
                        Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
                        consumer.accept(tp, offset.orElse(0l));
                });
--      }
++              }
  
        List<TopicPartition> partitions()
        {
                        newOffsets.put(tp, offset - 1);
                });
  
-               Consumer<ConsumerRecord<String, Long>> captureOffsetAndExecuteTestHandler =
-                               record ->
-                               {
-                                       newOffsets.put(
-                                                       new TopicPartition(record.topic(), record.partition()),
-                                                       record.offset());
-                                       receivedRecords.add(record);
-                                       testHandler.accept(record);
 -              TestRecordHandler<String, String> captureOffsetAndExecuteTestHandler =
 -                              new TestRecordHandler<String, String>(wordcountRecordHandler) {
++              TestRecordHandler<String, Long> captureOffsetAndExecuteTestHandler =
++                              new TestRecordHandler<String, Long>(keyCountingRecordHandler) {
+                                       @Override
 -                                      public void onNewRecord(ConsumerRecord<String, String> record)
++                                      public void onNewRecord(ConsumerRecord<String, Long> record)
+                                       {
+                                               newOffsets.put(
+                                                               new TopicPartition(record.topic(), record.partition()),
+                                                               record.offset());
+                                               receivedRecords.add(record);
+                                       }
                                };
  
                endlessConsumer =
                                new EndlessConsumer<>(
                                                executor,
-                                               repository,
                                                properties.getClientId(),
                                                properties.getTopic(),
-                                               Clock.systemDefaultZone(),
-                                               properties.getCommitInterval(),
                                                kafkaConsumer,
 -                                              wordcountRebalanceListener,
++                                              keyCountingRebalanceListener,
                                                captureOffsetAndExecuteTestHandler);
  
                endlessConsumer.start();