Verbesserungen aus 'rebalance-listener' nach 'counting-consumer' gemerged
authorKai Moritz <kai@juplo.de>
Sat, 13 Aug 2022 09:15:54 +0000 (11:15 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 13 Aug 2022 09:25:55 +0000 (11:25 +0200)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java [deleted file]
src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 7a0a8ad..bb219d0 100644 (file)
@@ -22,21 +22,10 @@ public class ApplicationConfiguration
     return new KeyCountingRecordHandler();
   }
 
-  @Bean
-  public KeyCountingRebalanceListener keyCountingRebalanceListener(
-      KeyCountingRecordHandler keyCountingRecordHandler,
-      ApplicationProperties properties)
-  {
-    return new KeyCountingRebalanceListener(
-        keyCountingRecordHandler,
-        properties.getClientId());
-  }
-
   @Bean
   public EndlessConsumer<String, Long> endlessConsumer(
       KafkaConsumer<String, Long> kafkaConsumer,
       ExecutorService executor,
-      KeyCountingRebalanceListener keyCountingRebalanceListener,
       KeyCountingRecordHandler keyCountingRecordHandler,
       ApplicationProperties properties)
   {
@@ -46,7 +35,6 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
-            keyCountingRebalanceListener,
             keyCountingRecordHandler);
   }
 
index c7579b8..63a2f93 100644 (file)
@@ -25,7 +25,6 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final ConsumerRebalanceListener consumerRebalanceListener;
   private final RecordHandler<K, V> handler;
 
   private final Lock lock = new ReentrantLock();
@@ -42,7 +41,7 @@ public class EndlessConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic), consumerRebalanceListener);
+      consumer.subscribe(Arrays.asList(topic));
 
       while (true)
       {
diff --git a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java
deleted file mode 100644 (file)
index 0ad1f31..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KeyCountingRebalanceListener implements ConsumerRebalanceListener
-{
-  private final KeyCountingRecordHandler handler;
-  private final String id;
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      log.info("{} - adding partition: {}", id, partition);
-      handler.addPartition(partition, new HashMap<>());
-    });
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      log.info("{} - removing partition: {}", id, partition);
-      Map<String, Long> removed = handler.removePartition(partition);
-      for (String key : removed.keySet())
-      {
-        log.info(
-            "{} - Seen {} messages for partition={}|key={}",
-            id,
-            removed.get(key),
-            partition,
-            key);
-      }
-    });
-  }
-}
index 099dcf7..83b3ff2 100644 (file)
@@ -18,6 +18,10 @@ public class KeyCountingRecordHandler implements RecordHandler<String, Long>
   {
     Integer partition = record.partition();
     String key = record.key() == null ? "NULL" : record.key().toString();
+
+    if (!seen.containsKey(partition))
+      seen.put(partition, new HashMap<>());
+
     Map<String, Long> byKey = seen.get(partition);
 
     if (!byKey.containsKey(key))
@@ -28,16 +32,6 @@ public class KeyCountingRecordHandler implements RecordHandler<String, Long>
     byKey.put(key, seenByKey);
   }
 
-  public void addPartition(Integer partition, Map<String, Long> statistics)
-  {
-    seen.put(partition, statistics);
-  }
-
-  public Map<String, Long> removePartition(Integer partition)
-  {
-    return seen.remove(partition);
-  }
-
 
   public Map<Integer, Map<String, Long>> getSeen()
   {
index 5b13b7d..0909f2c 100644 (file)
@@ -65,8 +65,6 @@ class ApplicationTests
        @Autowired
        ExecutorService executor;
        @Autowired
-       KeyCountingRebalanceListener keyCountingRebalanceListener;
-       @Autowired
        KeyCountingRecordHandler keyCountingRecordHandler;
 
        EndlessConsumer<String, Long> endlessConsumer;
@@ -287,7 +285,6 @@ class ApplicationTests
                                                properties.getClientId(),
                                                properties.getTopic(),
                                                kafkaConsumer,
-                                               keyCountingRebalanceListener,
                                                captureOffsetAndExecuteTestHandler);
 
                endlessConsumer.start();