Refaktorisierung für Tests - KafkaConsumer als eigenständige Bean
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 6af3765..0bf5925 100644 (file)
@@ -1,13 +1,10 @@
 package de.juplo.kafka;
 
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.StringDeserializer;
 
 import javax.annotation.PreDestroy;
 import java.time.Duration;
@@ -20,59 +17,29 @@ import java.util.concurrent.locks.ReentrantLock;
 
 
 @Slf4j
+@RequiredArgsConstructor
 public class EndlessConsumer implements Runnable
 {
   private final ExecutorService executor;
-  private final String bootstrapServer;
-  private final String groupId;
   private final String id;
   private final String topic;
-  private final String autoOffsetReset;
+  private final Consumer<String, String> consumer;
 
   private final Lock lock = new ReentrantLock();
   private final Condition condition = lock.newCondition();
   private boolean running = false;
   private Exception exception;
   private long consumed = 0;
-  private KafkaConsumer<String, String> consumer = null;
-
 
   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
   private final Map<Integer, Long> offsets = new HashMap<>();
 
 
-  public EndlessConsumer(
-      ExecutorService executor,
-      String bootstrapServer,
-      String groupId,
-      String clientId,
-      String topic,
-      String autoOffsetReset)
-  {
-    this.executor = executor;
-    this.bootstrapServer = bootstrapServer;
-    this.groupId = groupId;
-    this.id = clientId;
-    this.topic = topic;
-    this.autoOffsetReset = autoOffsetReset;
-  }
-
   @Override
   public void run()
   {
     try
     {
-      Properties props = new Properties();
-      props.put("bootstrap.servers", bootstrapServer);
-      props.put("group.id", groupId);
-      props.put("client.id", id);
-      props.put("auto.offset.reset", autoOffsetReset);
-      props.put("metadata.max.age.ms", "1000");
-      props.put("key.deserializer", StringDeserializer.class.getName());
-      props.put("value.deserializer", StringDeserializer.class.getName());
-
-      this.consumer = new KafkaConsumer<>(props);
-
       log.info("{} - Subscribing to topic {}", id, topic);
       consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener()
       {
@@ -153,7 +120,8 @@ public class EndlessConsumer implements Runnable
     }
     catch(WakeupException e)
     {
-      log.info("{} - RIIING!", id);
+      log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
+      consumer.commitSync();
       shutdown();
     }
     catch(Exception e)
@@ -163,8 +131,6 @@ public class EndlessConsumer implements Runnable
     }
     finally
     {
-      log.info("{} - Closing the KafkaConsumer", id);
-      consumer.close();
       log.info("{} - Consumer-Thread exiting", id);
     }
   }
@@ -179,9 +145,25 @@ public class EndlessConsumer implements Runnable
     lock.lock();
     try
     {
-      running = false;
-      exception = e;
-      condition.signal();
+      try
+      {
+        log.info("{} - Unsubscribing from topic {}", id, topic);
+        consumer.unsubscribe();
+      }
+      catch (Exception ue)
+      {
+        log.error(
+            "{} - Error while unsubscribing from topic {}: {}",
+            id,
+            topic,
+            ue.toString());
+      }
+      finally
+      {
+        running = false;
+        exception = e;
+        condition.signal();
+      }
     }
     finally
     {