* If the last seen offset and the current offset differ, although the
partition did not contain any messages between this offsets, the loading
process got stuck, because the position never advanced.
* Therefore, the actual position, that is compared against the read
end-offset, has to be requested from the consumer.
return IntStream
.range(0, numShards)
.filter(shard -> isShardOwned[shard])
- .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
+ .allMatch(shard ->
+ {
+ TopicPartition partition = new TopicPartition(topic, shard);
+ long position = consumer.position(partition);
+ return position >= currentOffset[shard];
+ });
}
private void pauseAllOwnedPartions()