{
log.info("{} - Adding partition {}", id, partition);
assignedPartitions.add(partition);
- stateRestoring(partition.partition());
}
else
{
assignedPartitions.forEach(messagePartition ->
{
int partition = messagePartition.partition();
- TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
+ TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
long stateBeginningOffset = consumer
.beginningOffsets(List.of(statePartition))
.get(statePartition);
if (stateBeginningOffset < stateEndOffsets[partition])
{
- log.info(
- "{} - Seeking to first offset {} for state partition {}",
- id,
- stateBeginningOffset,
- statePartition);
- consumer.seek(statePartition, stateBeginningOffset);
-
- log.info("{} - Pausing message partition {}", id, messagePartition);
- consumer.pause(List.of(messagePartition));
-
- log.info("{} - Resuming state partition {}", id, statePartition);
- consumer.resume(List.of(statePartition));
+ stateRestoring(partition, stateBeginningOffset);
}
else
{
});
}
- private void stateRestoring(int partition)
+ private void stateRestoring(int partition, long stateBeginningOffset)
{
log.info(
"{} - Changing partition-state for {}: {} -> RESTORING",
id,
partition,
phaser.getRegisteredParties());
+
+ TopicPartition messagePartition = new TopicPartition(this.topic, partition);
+ log.info("{} - Pausing message partition {}", id, messagePartition);
+ consumer.pause(List.of(messagePartition));
+
+ TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
+ log.info(
+ "{} - Seeking to first offset {} for state partition {}",
+ id,
+ stateBeginningOffset,
+ statePartition);
+ consumer.seek(statePartition, stateBeginningOffset);
+ log.info("{} - Resuming state partition {}", id, statePartition);
+ consumer.resume(List.of(statePartition));
}
private void stateAssigned(int partition)