X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=00678c45cf344dd43e5e4d5b5bdcbea74bd1a75d;hb=2d25525ef70a90709edc48bd9542d1b08a2888a2;hp=02385211cdf8470df6973f6bb608095dcebe501f;hpb=bfddb34a846a27a477d97eaa4db9221afbd6dbba;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0238521..00678c4 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final PollIntervalAwareConsumerRebalanceListener rebalanceListener; + private final ConsumerRebalanceListener rebalanceListener; private final RecordHandler recordHandler; private final Lock lock = new ReentrantLock(); @@ -67,8 +67,6 @@ public class EndlessConsumer implements Runnable consumed++; } - - rebalanceListener.beforeNextPoll(); } } catch(WakeupException e)