X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSimpleConsumer.java;h=913ffd52cfe5aaa0ebc140452b974e237469c2b4;hb=HEAD;hp=eaa95666ba81bb229697519d38583d4ec42533d0;hpb=e6b4f984fdd50c24485337d0fd335233b9f0e299;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index eaa9566..913ffd5 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -3,6 +3,7 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; @@ -19,7 +20,7 @@ import java.util.concurrent.locks.ReentrantLock; public class SimpleConsumer { private long consumed = 0; - private KafkaConsumer consumer; + private Consumer consumer; private Lock lock = new ReentrantLock(); private Condition stopped = lock.newCondition(); @@ -29,14 +30,14 @@ public class SimpleConsumer // tag::create[] Properties props = new Properties(); props.put("bootstrap.servers", ":9092"); - props.put("group.id", "my-consumer"); // << Used for Offset-Commits + props.put("group.id", "my-consumer"); // << Used for Offset-Management // end::create[] props.put("auto.offset.reset", "earliest"); // tag::create[] props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); - KafkaConsumer consumer = new KafkaConsumer<>(props); + Consumer consumer = new KafkaConsumer<>(props); // end::create[] this.consumer = consumer; } @@ -79,7 +80,8 @@ public class SimpleConsumer } catch(WakeupException e) { - log.info("{} - RIIING!", id); + log.info("{} - Closing the KafkaConsumer", id); + consumer.close(); } catch(Exception e) { @@ -87,11 +89,10 @@ public class SimpleConsumer } finally { + log.info("{} - Shutting down..."); this.lock.lock(); try { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); log.info("{} - DONE!"); stopped.signal(); }