refactor: Handling der Partitionen in WordcountRebalanceListener
authorKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 09:53:46 +0000 (11:53 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 14:45:21 +0000 (16:45 +0200)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/RecordHandler.java
src/main/java/de/juplo/kafka/WordcountRebalanceListener.java
src/main/java/de/juplo/kafka/WordcountRecordHandler.java
src/test/java/de/juplo/kafka/TestRecordHandler.java

index da1605b..0d17823 100644 (file)
@@ -25,7 +25,6 @@ public class ApplicationConfiguration
   {
     return new WordcountRecordHandler(
         repository,
-        properties.getClientId(),
         properties.getTopic(),
         Clock.systemDefaultZone(),
         properties.getCommitInterval(),
@@ -34,9 +33,16 @@ public class ApplicationConfiguration
 
   @Bean
   public WordcountRebalanceListener wordcountRebalanceListener(
-      WordcountRecordHandler wordcountRecordHandler)
+      WordcountRecordHandler wordcountRecordHandler,
+      PartitionStatisticsRepository repository,
+      Consumer<String, String> consumer,
+      ApplicationProperties properties)
   {
-    return new WordcountRebalanceListener(wordcountRecordHandler);
+    return new WordcountRebalanceListener(
+        wordcountRecordHandler,
+        repository,
+        properties.getClientId(),
+        consumer);
   }
 
   @Bean
index ff2f193..3c9dd15 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
 
 import java.util.function.Consumer;
 
@@ -9,8 +8,4 @@ import java.util.function.Consumer;
 public interface RecordHandler<K, V> extends Consumer<ConsumerRecord<K,V>>
 {
   default void beforeNextPoll() {}
-
-  default void onPartitionAssigned(TopicPartition tp) {}
-
-  default void onPartitionRevoked(TopicPartition tp) {}
 }
index fd551c2..9a69c8f 100644 (file)
@@ -1,27 +1,61 @@
 package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.Collection;
+import java.util.Map;
 
 
 @RequiredArgsConstructor
+@Slf4j
 public class WordcountRebalanceListener implements ConsumerRebalanceListener
 {
-  private final RecordHandler<String, String> handler;
+  private final WordcountRecordHandler handler;
+  private final PartitionStatisticsRepository repository;
+  private final String id;
+  private final Consumer<String, String> consumer;
 
 
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
-    partitions.forEach(tp -> handler.onPartitionAssigned(tp));
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long offset = consumer.position(tp);
+      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
+      StatisticsDocument document =
+          repository
+              .findById(Integer.toString(partition))
+              .orElse(new StatisticsDocument(partition));
+      if (document.offset >= 0)
+      {
+        // Only seek, if a stored offset was found
+        // Otherwise: Use initial offset, generated by Kafka
+        consumer.seek(tp, document.offset);
+      }
+      handler.addPartition(partition, document.statistics);
+    });
   }
 
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
-    partitions.forEach(tp -> handler.onPartitionRevoked(tp));
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long newOffset = consumer.position(tp);
+      log.info(
+          "{} - removing partition: {}, offset of next message {})",
+          id,
+          partition,
+          newOffset);
+      Map<String, Map<String, Long>> removed = handler.removePartition(partition);
+      repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+    });
   }
 }
index 5981c7d..bdf4b32 100644 (file)
@@ -22,7 +22,6 @@ public class WordcountRecordHandler implements RecordHandler<String, String>
 
 
   private final PartitionStatisticsRepository repository;
-  private final String id;
   private final String topic;
   private final Clock clock;
   private final Duration commitInterval;
@@ -78,37 +77,14 @@ public class WordcountRecordHandler implements RecordHandler<String, String>
     }
   }
 
-  @Override
-  public void onPartitionAssigned(TopicPartition tp)
+  public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
   {
-    Integer partition = tp.partition();
-    Long offset = consumer.position(tp);
-    log.info("{} - adding partition: {}, offset={}", id, partition, offset);
-    StatisticsDocument document =
-        repository
-            .findById(Integer.toString(partition))
-            .orElse(new StatisticsDocument(partition));
-    if (document.offset >= 0)
-    {
-      // Only seek, if a stored offset was found
-      // Otherwise: Use initial offset, generated by Kafka
-      consumer.seek(tp, document.offset);
-    }
-    seen.put(partition, document.statistics);
+    seen.put(partition, statistics);
   }
 
-  @Override
-  public void onPartitionRevoked(TopicPartition tp)
+  public Map<String, Map<String, Long>> removePartition(Integer partition)
   {
-    Integer partition = tp.partition();
-    Long newOffset = consumer.position(tp);
-    log.info(
-        "{} - removing partition: {}, offset of next message {})",
-        id,
-        partition,
-        newOffset);
-    Map<String, Map<String, Long>> removed = seen.remove(partition);
-    repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+    return seen.remove(partition);
   }
 
 
index 4047093..de28385 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
 
 
 @RequiredArgsConstructor
@@ -26,16 +25,4 @@ public abstract class TestRecordHandler<K, V> implements RecordHandler<K, V>
   {
     handler.beforeNextPoll();
   }
-
-  @Override
-  public void onPartitionAssigned(TopicPartition tp)
-  {
-    handler.onPartitionAssigned(tp);
-  }
-
-  @Override
-  public void onPartitionRevoked(TopicPartition tp)
-  {
-    handler.onPartitionRevoked(tp);
-  }
 }