- }
-
- rebalanceListener.beforeNextPoll();
- }
- }
- catch(WakeupException e)
- {
- log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
- consumer.commitSync();
- shutdown();
- }
- catch(RecordDeserializationException e)
- {
- TopicPartition tp = e.topicPartition();
- long offset = e.offset();
- log.error(
- "{} - Could not deserialize message on topic {} with offset={}: {}",
- id,
- tp,
- offset,
- e.getCause().toString());
-
- consumer.commitSync();
- shutdown(e);
- }
- catch(Exception e)
- {
- log.error("{} - Unexpected error: {}", id, e.toString(), e);
- shutdown(e);
- }
- finally
- {
- log.info("{} - Consumer-Thread exiting", id);
- }