for (ConsumerRecord<String, AbstractMessageTo> record : records)
{
handleMessage(record);
- updateNextOffset(record.partition(), record.offset() + 1);
+ this.nextOffset[record.partition()] = record.offset() + 1;
}
+ updateChannelState();
}
catch (WakeupException e)
{
log.info("Exiting normally");
}
- private void updateNextOffset(int partition, long nextOffset)
+ private void updateChannelState()
{
- this.nextOffset[partition] = nextOffset;
if (channelState == ChannelState.LOAD_IN_PROGRESS)
{
boolean loadInProgress = IntStream