}
catch(WakeupException e)
{
- log.info("{} - RIIING! Request to stop consumption.", id);
+ log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
+ consumer.commitSync();
shutdown();
}
catch(RecordDeserializationException e)
offset,
e.getCause().toString());
+ consumer.commitSync();
shutdown(e);
}
catch(Exception e)