Beispiele verwenden das Interface, um die erzeugte Instanz abzulegen
authorKai Moritz <kai@juplo.de>
Fri, 9 Jun 2023 09:16:42 +0000 (11:16 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 11 Jun 2023 18:52:58 +0000 (20:52 +0200)
src/main/java/de/juplo/kafka/SimpleConsumer.java

index 9741e67..cee2165 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -17,7 +18,7 @@ public class SimpleConsumer
 {
   private final String id;
   private final String topic;
-  private final KafkaConsumer<String, String> consumer;
+  private final Consumer<String, String> consumer;
 
   private volatile boolean running = false;
   private long consumed = 0;
@@ -33,10 +34,9 @@ public class SimpleConsumer
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
 
-    consumer = new KafkaConsumer<>(props);
-
-    this.topic = topic;
     this.id = clientId;
+    this.topic = topic;
+    consumer = new KafkaConsumer<>(props);
   }