refactor: Alle Kafka-Belange in den `WordcountRebalanceListener` verschoben
authorKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 10:04:27 +0000 (12:04 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 14:45:21 +0000 (16:45 +0200)
* Dafür neues Interface `PollIntervalAwareRebalanceListener` eingeführt.
* `WordcountRebalanceListener` implementiert das neue Interface und
  kümmert sich um alle Kafka-Belange.
* `WordcountRecordHandler` kümmert sich nur noch um die Fachlogik.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/WordcountRebalanceListener.java
src/main/java/de/juplo/kafka/WordcountRecordHandler.java

index 0d17823..d48c027 100644 (file)
@@ -18,17 +18,9 @@ import java.util.concurrent.Executors;
 public class ApplicationConfiguration
 {
   @Bean
-  public WordcountRecordHandler wordcountRecordHandler(
-      PartitionStatisticsRepository repository,
-      Consumer<String, String> consumer,
-      ApplicationProperties properties)
+  public WordcountRecordHandler wordcountRecordHandler()
   {
-    return new WordcountRecordHandler(
-        repository,
-        properties.getTopic(),
-        Clock.systemDefaultZone(),
-        properties.getCommitInterval(),
-        consumer);
+    return new WordcountRecordHandler();
   }
 
   @Bean
@@ -42,6 +34,9 @@ public class ApplicationConfiguration
         wordcountRecordHandler,
         repository,
         properties.getClientId(),
+        properties.getTopic(),
+        Clock.systemDefaultZone(),
+        properties.getCommitInterval(),
         consumer);
   }
 
index 0f3316d..58557f2 100644 (file)
@@ -25,7 +25,7 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final ConsumerRebalanceListener rebalanceListener;
+  private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener;
   private final RecordHandler<K, V> handler;
 
   private final Lock lock = new ReentrantLock();
@@ -42,7 +42,7 @@ public class EndlessConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic), rebalanceListener);
+      consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
 
       while (true)
       {
@@ -68,7 +68,7 @@ public class EndlessConsumer<K, V> implements Runnable
           consumed++;
         }
 
-        handler.beforeNextPoll();
+        pollIntervalAwareRebalanceListener.beforeNextPoll();
       }
     }
     catch(WakeupException e)
diff --git a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java
new file mode 100644 (file)
index 0000000..8abec12
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+
+public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener
+{
+  default void beforeNextPoll() {}
+}
index 9a69c8f..9f2fc0f 100644 (file)
@@ -3,22 +3,28 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Map;
 
 
 @RequiredArgsConstructor
 @Slf4j
-public class WordcountRebalanceListener implements ConsumerRebalanceListener
+public class WordcountRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
 {
   private final WordcountRecordHandler handler;
   private final PartitionStatisticsRepository repository;
   private final String id;
+  private final String topic;
+  private final Clock clock;
+  private final Duration commitInterval;
   private final Consumer<String, String> consumer;
 
+  private Instant lastCommit = Instant.EPOCH;
 
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
@@ -58,4 +64,20 @@ public class WordcountRebalanceListener implements ConsumerRebalanceListener
       repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
     });
   }
+
+
+  @Override
+  public void beforeNextPoll()
+  {
+    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+    {
+      log.debug("Storing data and offsets, last commit: {}", lastCommit);
+      handler.getSeen().forEach((partiton, statistics) -> repository.save(
+          new StatisticsDocument(
+              partiton,
+              statistics,
+              consumer.position(new TopicPartition(topic, partiton)))));
+      lastCommit = clock.instant();
+    }
+  }
 }
index bdf4b32..4efc547 100644 (file)
@@ -1,36 +1,21 @@
 package de.juplo.kafka;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
 
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Pattern;
 
 
-@RequiredArgsConstructor
 @Slf4j
 public class WordcountRecordHandler implements RecordHandler<String, String>
 {
   final static Pattern PATTERN = Pattern.compile("\\W+");
 
 
-  private final PartitionStatisticsRepository repository;
-  private final String topic;
-  private final Clock clock;
-  private final Duration commitInterval;
-  private final Consumer<String, String> consumer;
-
   private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
 
-  private Instant lastCommit = Instant.EPOCH;
-
 
   @Override
   public void accept(ConsumerRecord<String, String> record)
@@ -61,22 +46,6 @@ public class WordcountRecordHandler implements RecordHandler<String, String>
     }
   }
 
-
-  @Override
-  public void beforeNextPoll()
-  {
-    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
-    {
-      log.debug("Storing data and offsets, last commit: {}", lastCommit);
-      seen.forEach((partiton, statistics) -> repository.save(
-          new StatisticsDocument(
-              partiton,
-              statistics,
-              consumer.position(new TopicPartition(topic, partiton)))));
-      lastCommit = clock.instant();
-    }
-  }
-
   public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
   {
     seen.put(partition, statistics);