Beispiele verwenden das Interface, um die erzeugte Instanz abzulegen
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleConsumer.java
index eaa9566..913ffd5 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -19,7 +20,7 @@ import java.util.concurrent.locks.ReentrantLock;
 public class SimpleConsumer
 {
   private long consumed = 0;
-  private KafkaConsumer<String, String> consumer;
+  private Consumer<String, String> consumer;
   private Lock lock = new ReentrantLock();
   private Condition stopped = lock.newCondition();
 
@@ -29,14 +30,14 @@ public class SimpleConsumer
     // tag::create[]
     Properties props = new Properties();
     props.put("bootstrap.servers", ":9092");
-    props.put("group.id", "my-consumer"); // << Used for Offset-Commits
+    props.put("group.id", "my-consumer"); // << Used for Offset-Management
     // end::create[]
     props.put("auto.offset.reset", "earliest");
     // tag::create[]
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
 
-    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+    Consumer<String, String> consumer = new KafkaConsumer<>(props);
     // end::create[]
     this.consumer = consumer;
   }
@@ -79,7 +80,8 @@ public class SimpleConsumer
     }
     catch(WakeupException e)
     {
-      log.info("{} - RIIING!", id);
+      log.info("{} - Closing the KafkaConsumer", id);
+      consumer.close();
     }
     catch(Exception e)
     {
@@ -87,11 +89,10 @@ public class SimpleConsumer
     }
     finally
     {
+      log.info("{} - Shutting down...");
       this.lock.lock();
       try
       {
-        log.info("{} - Closing the KafkaConsumer", id);
-        consumer.close();
         log.info("{} - DONE!");
         stopped.signal();
       }