From: Kai Moritz Date: Sat, 2 Nov 2024 14:03:10 +0000 (+0100) Subject: Fix & Refactor: Restore-Behandlung wurde _allen_ aktiven Partitionen zuteil X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=5abcc2a99e3e6ff04ce9afcc6f402592f2990b3c;p=demos%2Fkafka%2Ftraining Fix & Refactor: Restore-Behandlung wurde _allen_ aktiven Partitionen zuteil * Durch das vorausgehende Refactoring wurde deutlich, dass die Behandlung, die den _neu_ hinzugefügten Partitionen zugedacht war, allen in `assignedPartitions` vermerkten Partitionen wiederfahren ist. * Dies ist für den aktuellen Entwicklungsstand ggf. egal, da der wegen dem Co-Partitioning (noch!) benötigte `RangeAssignor` eh _zuerst alle_ Partitionen entzieht, bevor er _dann alle_ neu zuteilt. * Da der Code aber auch mit dem neuen Consumer-Rebalance Protokoll funktionieren muss, wurde das Refactoring hier fortgeführt und so vollendet, dass nun _alle_ Aktionenen _nur noch_ von den Callbacks `onPartitionsAssigned()` und `onPartitionsRevoked()` ausgeht. --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index a859a9a..42daf03 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -291,66 +291,63 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener @Override public void onPartitionsAssigned(Collection partitions) { - partitions.forEach(partition -> - { - if (partition.topic().equals(topic)) - { - log.info("{} - Adding partition {}", id, partition); - assignedPartitions.add(partition); - } - else - { - long endOffset = consumer - .endOffsets(List.of(partition)) - .get(partition) - .longValue(); - log.info("{} - Found end-offset {} for state partition {}", id, endOffset, partition); - stateEndOffsets[partition.partition()] = endOffset; - } - }); + partitions + .stream() + .filter(partition -> partition.topic().equals(topic)) + .forEach(partition -> restoreAndAssign(partition.partition())); + } - assignedPartitions.forEach(messagePartition -> - { - int partition = messagePartition.partition(); + @Override + public synchronized void onPartitionsRevoked(Collection partitions) + { + partitions + .stream() + .filter(partition -> partition.topic().equals(topic)) + .forEach(partition -> revoke(partition.partition())); + } - TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); - long stateBeginningOffset = consumer - .beginningOffsets(List.of(statePartition)) - .get(statePartition); + private void restoreAndAssign(int partition) + { + TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); - if (stateBeginningOffset < stateEndOffsets[partition]) - { - stateRestoring(partition, stateBeginningOffset); - } - else - { - log.info("{} - State is up-to-date for message partition {}", id, messagePartition); - stateAssigned(partition); - } - }); + long stateEndOffset = consumer + .endOffsets(List.of(statePartition)) + .get(statePartition) + .longValue(); + + long stateBeginningOffset = consumer + .beginningOffsets(List.of(statePartition)) + .get(statePartition); + + log.info( + "{} - Found beginning-offset {} and end-offset {} for state partition {}", + id, + stateBeginningOffset, + stateEndOffset, + partition); + + if (stateBeginningOffset < stateEndOffset) + { + stateRestoring(partition, stateBeginningOffset, stateEndOffset); + } + else + { + log.info("{} - State is up-to-date for partition {}", id, partition); + stateAssigned(partition); + } } - @Override - public synchronized void onPartitionsRevoked(Collection partitions) + private void revoke(int partition) { - partitions.forEach(partition -> + PartitionState partitionState = partitionStates[partition]; + switch (partitionState) { - if (partition.topic().equals(topic)) - { - log.info("{} - Revoking partition {}", id, partition); - assignedPartitions.remove(partition); - - PartitionState partitionState = partitionStates[partition.partition()]; - switch (partitionState) - { - case RESTORING, ASSIGNED -> stateUnassigned(partition.partition()); - case UNASSIGNED -> log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState); - } - } - }); + case RESTORING, ASSIGNED -> stateUnassigned(partition); + case UNASSIGNED -> log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState); + } } - private void stateRestoring(int partition, long stateBeginningOffset) + private void stateRestoring(int partition, long stateBeginningOffset, long stateEndOffset) { log.info( "{} - Changing partition-state for {}: {} -> RESTORING", @@ -359,13 +356,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener partitionStates[partition]); partitionStates[partition] = PartitionState.RESTORING; - phaser.register(); - log.info( - "{} - Registered new party for newly assigned partition {}. New total number of parties: {}", - id, - partition, - phaser.getRegisteredParties()); - TopicPartition messagePartition = new TopicPartition(this.topic, partition); log.info("{} - Pausing message partition {}", id, messagePartition); consumer.pause(List.of(messagePartition)); @@ -377,6 +367,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener stateBeginningOffset, statePartition); consumer.seek(statePartition, stateBeginningOffset); + stateEndOffsets[partition] = stateEndOffset; log.info("{} - Resuming state partition {}", id, statePartition); consumer.resume(List.of(statePartition)); } @@ -396,26 +387,43 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener consumer.pause(List.of(statePartition)); TopicPartition messagePartition = new TopicPartition(topic, partition); + log.info("{} - Adding partition {} to the assigned partitions", id, messagePartition); + assignedPartitions.add(messagePartition); + phaser.register(); + log.info( + "{} - Registered new party for newly assigned partition {}. New total number of parties: {}", + id, + messagePartition, + phaser.getRegisteredParties()); log.info("{} - Resuming message partition {}...", id, messagePartition); consumer.resume(List.of(messagePartition)); } private void stateUnassigned(int partition) { + PartitionState oldPartitionState = partitionStates[partition]; + log.info( "{} - State-change for partition {}: {} -> UNASSIGNED", id, partition, - partitionStates[partition]); + oldPartitionState); partitionStates[partition] = PartitionState.UNASSIGNED; - phaser.arriveAndDeregister(); - log.info( - "{} - Deregistered party for revoked partition {}. New total number of parties: {}", - id, - partition, - phaser.getRegisteredParties()); + if (oldPartitionState == PartitionState.ASSIGNED) + { + TopicPartition messagePartition = new TopicPartition(topic, partition); + log.info("{} - Revoking partition {}", id, messagePartition); + assignedPartitions.remove(messagePartition); + + phaser.arriveAndDeregister(); + log.info( + "{} - Deregistered party for revoked partition {}. New total number of parties: {}", + id, + messagePartition, + phaser.getRegisteredParties()); + } }