Refaktorisierung für Tests - KafkaConsumer als eigenständige Bean
authorKai Moritz <kai@juplo.de>
Sat, 9 Apr 2022 09:21:43 +0000 (11:21 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 20:26:35 +0000 (22:26 +0200)
* Der KafakConsumer wird als eigenständige Bean erzeugt
* Die Bean wird dem EndlessConsumer im Konstruktor übergeben
* Dafür muss der Lebenszyklus der KafkaConsumer-Bean von dem der
  EndlessConsumer-Bean getrennt werden:
** close() darf nicht mehr im finally-Block im EndlessConsumer aufgerufen
   werden
** Stattdessen muss close() als Destry-Methode der Bean definiert werden
** Für start/stop muss stattdessen unsubscribe() im finally-Block aufgerufen
   werden
** Da unsubscribe() die Offset-Position nicht commited, muss explizit
   ein Offsset-Commit beauftragt werden, wenn der Consumer regulär
   gestoppt wird (WakeupException)

src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index de4b66d..5226d6b 100644 (file)
@@ -1,11 +1,14 @@
 package de.juplo.kafka;
 
-import org.springframework.beans.factory.annotation.Autowired;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 
+import java.util.Properties;
 import java.util.concurrent.Executors;
 
 
@@ -13,27 +16,40 @@ import java.util.concurrent.Executors;
 @EnableConfigurationProperties(ApplicationProperties.class)
 public class Application
 {
-  @Autowired
-  ApplicationProperties properties;
-
-
   @Bean
-  public EndlessConsumer consumer()
+  public EndlessConsumer endlessConsumer(
+      KafkaConsumer<String, String> kafkaConsumer,
+      ApplicationProperties properties)
   {
     EndlessConsumer consumer =
         new EndlessConsumer(
             Executors.newFixedThreadPool(1),
-            properties.getBootstrapServer(),
-            properties.getGroupId(),
             properties.getClientId(),
             properties.getTopic(),
-            properties.getAutoOffsetReset());
+            kafkaConsumer);
 
     consumer.start();
 
     return consumer;
   }
 
+  @Bean(destroyMethod = "close")
+  public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
+  {
+    Properties props = new Properties();
+
+    props.put("bootstrap.servers", properties.getBootstrapServer());
+    props.put("group.id", properties.getGroupId());
+    props.put("client.id", properties.getClientId());
+    props.put("auto.offset.reset", properties.getAutoOffsetReset());
+    props.put("metadata.max.age.ms", "1000");
+    props.put("key.deserializer", StringDeserializer.class.getName());
+    props.put("value.deserializer", LongDeserializer.class.getName());
+
+    return new KafkaConsumer<>(props);
+  }
+
+
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
index 6af3765..0bf5925 100644 (file)
@@ -1,13 +1,10 @@
 package de.juplo.kafka;
 
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.StringDeserializer;
 
 import javax.annotation.PreDestroy;
 import java.time.Duration;
@@ -20,59 +17,29 @@ import java.util.concurrent.locks.ReentrantLock;
 
 
 @Slf4j
+@RequiredArgsConstructor
 public class EndlessConsumer implements Runnable
 {
   private final ExecutorService executor;
-  private final String bootstrapServer;
-  private final String groupId;
   private final String id;
   private final String topic;
-  private final String autoOffsetReset;
+  private final Consumer<String, String> consumer;
 
   private final Lock lock = new ReentrantLock();
   private final Condition condition = lock.newCondition();
   private boolean running = false;
   private Exception exception;
   private long consumed = 0;
-  private KafkaConsumer<String, String> consumer = null;
-
 
   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
   private final Map<Integer, Long> offsets = new HashMap<>();
 
 
-  public EndlessConsumer(
-      ExecutorService executor,
-      String bootstrapServer,
-      String groupId,
-      String clientId,
-      String topic,
-      String autoOffsetReset)
-  {
-    this.executor = executor;
-    this.bootstrapServer = bootstrapServer;
-    this.groupId = groupId;
-    this.id = clientId;
-    this.topic = topic;
-    this.autoOffsetReset = autoOffsetReset;
-  }
-
   @Override
   public void run()
   {
     try
     {
-      Properties props = new Properties();
-      props.put("bootstrap.servers", bootstrapServer);
-      props.put("group.id", groupId);
-      props.put("client.id", id);
-      props.put("auto.offset.reset", autoOffsetReset);
-      props.put("metadata.max.age.ms", "1000");
-      props.put("key.deserializer", StringDeserializer.class.getName());
-      props.put("value.deserializer", StringDeserializer.class.getName());
-
-      this.consumer = new KafkaConsumer<>(props);
-
       log.info("{} - Subscribing to topic {}", id, topic);
       consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener()
       {
@@ -153,7 +120,8 @@ public class EndlessConsumer implements Runnable
     }
     catch(WakeupException e)
     {
-      log.info("{} - RIIING!", id);
+      log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
+      consumer.commitSync();
       shutdown();
     }
     catch(Exception e)
@@ -163,8 +131,6 @@ public class EndlessConsumer implements Runnable
     }
     finally
     {
-      log.info("{} - Closing the KafkaConsumer", id);
-      consumer.close();
       log.info("{} - Consumer-Thread exiting", id);
     }
   }
@@ -179,9 +145,25 @@ public class EndlessConsumer implements Runnable
     lock.lock();
     try
     {
-      running = false;
-      exception = e;
-      condition.signal();
+      try
+      {
+        log.info("{} - Unsubscribing from topic {}", id, topic);
+        consumer.unsubscribe();
+      }
+      catch (Exception ue)
+      {
+        log.error(
+            "{} - Error while unsubscribing from topic {}: {}",
+            id,
+            topic,
+            ue.toString());
+      }
+      finally
+      {
+        running = false;
+        exception = e;
+        condition.signal();
+      }
     }
     finally
     {