import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
}
}
}
+ catch(WakeupException e)
+ {
+ log.info("{}: Wakeup!", id);
+ consumer.close();
+ }
finally
{
log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
Runtime.getRuntime().addShutdownHook(new Thread(() ->
{
- instance.consumer.close();
+ instance.consumer.wakeup();
while (instance.running)
{
log.info("Waiting...");