record.value());
}
+ checkRestoreProgress(partition);
+
done[partition.partition()] = true;
});
}
else
{
- handleState(partition, offset, key, value);
+ handleState(partition, key, value);
+ }
+ }
+
+ private void checkRestoreProgress(TopicPartition topicPartition)
+ {
+ int partition = topicPartition.partition();
+
+ if (partitionStates[partition] == State.RESTORING)
+ {
+ long consumerPosition = consumer.position(topicPartition);
+
+ if (consumerPosition + 1 >= stateEndOffsets[partition])
+ {
+ log.info(
+ "{} - Position of consumer is {}. Restoring of state for partition {} done!",
+ id,
+ consumerPosition,
+ topicPartition);
+ stateAssigned(partition);
+ }
+ else
+ {
+ log.debug(
+ "{} - Restored state up to offset {}, end-offset: {}",
+ id,
+ consumerPosition,
+ stateEndOffsets[partition]);
+ }
}
}
private synchronized void handleState(
int partition,
- long offset,
String key,
String value)
{
restoredState[partition].put(key, Long.parseLong(value));
- if (offset + 1 == stateEndOffsets[partition])
- {
- log.info("{} - Restoring of state for partition {} done!", id, partition);
- stateAssigned(partition);
- }
- else
- {
- log.debug(
- "{} - Restored state up to offset {}, end-offset: {}, state: {}={}",
- id,
- offset,
- stateEndOffsets[partition],
- key,
- value);
- }
}
private void handleMessage(