Vorlage vereinfacht: Rebalance-Listener entfernt
authorKai Moritz <kai@juplo.de>
Sun, 21 Aug 2022 11:06:19 +0000 (13:06 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 6 Sep 2022 17:18:46 +0000 (19:18 +0200)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java [deleted file]
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 624a4ec..e887987 100644 (file)
@@ -33,25 +33,10 @@ public class ApplicationConfiguration
     return new AdderResults();
   }
 
-  @Bean
-  public ApplicationRebalanceListener rebalanceListener(
-      ApplicationRecordHandler recordHandler,
-      AdderResults adderResults,
-      StateRepository stateRepository,
-      ApplicationProperties properties)
-  {
-    return new ApplicationRebalanceListener(
-        recordHandler,
-        adderResults,
-        stateRepository,
-        properties.getClientId());
-  }
-
   @Bean
   public EndlessConsumer<String, String> endlessConsumer(
       KafkaConsumer<String, String> kafkaConsumer,
       ExecutorService executor,
-      ApplicationRebalanceListener rebalanceListener,
       ApplicationRecordHandler recordHandler,
       ApplicationProperties properties)
   {
@@ -61,7 +46,6 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
-            rebalanceListener,
             recordHandler);
   }
 
diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
deleted file mode 100644 (file)
index e214a14..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.*;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ApplicationRebalanceListener implements ConsumerRebalanceListener
-{
-  private final ApplicationRecordHandler recordHandler;
-  private final AdderResults adderResults;
-  private final StateRepository stateRepository;
-  private final String id;
-
-  private final Set<Integer> partitions = new HashSet<>();
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      log.info("{} - adding partition: {}", id, partition);
-    });
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      log.info("{} - removing partition: {}", id, partition);
-      Map<String, AdderResult> state = recordHandler.getState(partition).getState();
-      for (String user : state.keySet())
-      {
-        log.info(
-            "{} - Saved state for partition={}|user={}: {}",
-            id,
-            partition,
-            user,
-            state.get(user));
-      }
-    });
-  }
-}
index 00678c4..3e7310e 100644 (file)
@@ -25,7 +25,6 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final ConsumerRebalanceListener rebalanceListener;
   private final RecordHandler<K, V> recordHandler;
 
   private final Lock lock = new ReentrantLock();
@@ -42,7 +41,7 @@ public class EndlessConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic), rebalanceListener);
+      consumer.subscribe(Arrays.asList(topic));
 
       while (true)
       {
index 8849317..e63c5ce 100644 (file)
@@ -67,8 +67,6 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        MongoProperties mongoProperties;
        @Autowired
-       ConsumerRebalanceListener rebalanceListener;
-       @Autowired
        RecordHandler<K, V> recordHandler;
 
        KafkaProducer<Bytes, Bytes> testRecordProducer;
@@ -376,7 +374,6 @@ abstract class GenericApplicationTests<K, V>
                                                properties.getClientId(),
                                                properties.getTopic(),
                                                kafkaConsumer,
-                                               rebalanceListener,
                                                captureOffsetAndExecuteTestHandler);
 
                endlessConsumer.start();