@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
- partitions.forEach(partition ->
- {
- if (partition.topic().equals(topic))
- {
- log.info("{} - Adding partition {}", id, partition);
- assignedPartitions.add(partition);
- }
- else
- {
- long endOffset = consumer
- .endOffsets(List.of(partition))
- .get(partition)
- .longValue();
- log.info("{} - Found end-offset {} for state partition {}", id, endOffset, partition);
- stateEndOffsets[partition.partition()] = endOffset;
- }
- });
+ partitions
+ .stream()
+ .filter(partition -> partition.topic().equals(topic))
+ .forEach(partition -> restoreAndAssign(partition.partition()));
+ }
- assignedPartitions.forEach(messagePartition ->
- {
- int partition = messagePartition.partition();
+ @Override
+ public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions
+ .stream()
+ .filter(partition -> partition.topic().equals(topic))
+ .forEach(partition -> revoke(partition.partition()));
+ }
- TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
- long stateBeginningOffset = consumer
- .beginningOffsets(List.of(statePartition))
- .get(statePartition);
+ private void restoreAndAssign(int partition)
+ {
+ TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
- if (stateBeginningOffset < stateEndOffsets[partition])
- {
- stateRestoring(partition, stateBeginningOffset);
- }
- else
- {
- log.info("{} - State is up-to-date for message partition {}", id, messagePartition);
- stateAssigned(partition);
- }
- });
+ long stateEndOffset = consumer
+ .endOffsets(List.of(statePartition))
+ .get(statePartition)
+ .longValue();
+
+ long stateBeginningOffset = consumer
+ .beginningOffsets(List.of(statePartition))
+ .get(statePartition);
+
+ log.info(
+ "{} - Found beginning-offset {} and end-offset {} for state partition {}",
+ id,
+ stateBeginningOffset,
+ stateEndOffset,
+ partition);
+
+ if (stateBeginningOffset < stateEndOffset)
+ {
+ stateRestoring(partition, stateBeginningOffset, stateEndOffset);
+ }
+ else
+ {
+ log.info("{} - State is up-to-date for partition {}", id, partition);
+ stateAssigned(partition);
+ }
}
- @Override
- public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ private void revoke(int partition)
{
- partitions.forEach(partition ->
+ PartitionState partitionState = partitionStates[partition];
+ switch (partitionState)
{
- if (partition.topic().equals(topic))
- {
- log.info("{} - Revoking partition {}", id, partition);
- assignedPartitions.remove(partition);
-
- PartitionState partitionState = partitionStates[partition.partition()];
- switch (partitionState)
- {
- case RESTORING, ASSIGNED -> stateUnassigned(partition.partition());
- case UNASSIGNED -> log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState);
- }
- }
- });
+ case RESTORING, ASSIGNED -> stateUnassigned(partition);
+ case UNASSIGNED -> log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState);
+ }
}
- private void stateRestoring(int partition, long stateBeginningOffset)
+ private void stateRestoring(int partition, long stateBeginningOffset, long stateEndOffset)
{
log.info(
"{} - Changing partition-state for {}: {} -> RESTORING",
partitionStates[partition]);
partitionStates[partition] = PartitionState.RESTORING;
- phaser.register();
- log.info(
- "{} - Registered new party for newly assigned partition {}. New total number of parties: {}",
- id,
- partition,
- phaser.getRegisteredParties());
-
TopicPartition messagePartition = new TopicPartition(this.topic, partition);
log.info("{} - Pausing message partition {}", id, messagePartition);
consumer.pause(List.of(messagePartition));
stateBeginningOffset,
statePartition);
consumer.seek(statePartition, stateBeginningOffset);
+ stateEndOffsets[partition] = stateEndOffset;
log.info("{} - Resuming state partition {}", id, statePartition);
consumer.resume(List.of(statePartition));
}
consumer.pause(List.of(statePartition));
TopicPartition messagePartition = new TopicPartition(topic, partition);
+ log.info("{} - Adding partition {} to the assigned partitions", id, messagePartition);
+ assignedPartitions.add(messagePartition);
+ phaser.register();
+ log.info(
+ "{} - Registered new party for newly assigned partition {}. New total number of parties: {}",
+ id,
+ messagePartition,
+ phaser.getRegisteredParties());
log.info("{} - Resuming message partition {}...", id, messagePartition);
consumer.resume(List.of(messagePartition));
}
private void stateUnassigned(int partition)
{
+ PartitionState oldPartitionState = partitionStates[partition];
+
log.info(
"{} - State-change for partition {}: {} -> UNASSIGNED",
id,
partition,
- partitionStates[partition]);
+ oldPartitionState);
partitionStates[partition] = PartitionState.UNASSIGNED;
- phaser.arriveAndDeregister();
- log.info(
- "{} - Deregistered party for revoked partition {}. New total number of parties: {}",
- id,
- partition,
- phaser.getRegisteredParties());
+ if (oldPartitionState == PartitionState.ASSIGNED)
+ {
+ TopicPartition messagePartition = new TopicPartition(topic, partition);
+ log.info("{} - Revoking partition {}", id, messagePartition);
+ assignedPartitions.remove(messagePartition);
+
+ phaser.arriveAndDeregister();
+ log.info(
+ "{} - Deregistered party for revoked partition {}. New total number of parties: {}",
+ id,
+ messagePartition,
+ phaser.getRegisteredParties());
+ }
}