import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
{
private final String id;
private final String topic;
- private final KafkaConsumer<String, String> consumer;
+ private final Consumer<String, String> consumer;
private volatile boolean running = false;
private long consumed = 0;
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
- consumer = new KafkaConsumer<>(props);
-
- this.topic = topic;
this.id = clientId;
+ this.topic = topic;
+ consumer = new KafkaConsumer<>(props);
}