From: Kai Moritz Date: Sat, 2 Nov 2024 12:37:38 +0000 (+0100) Subject: DRY für state-change zu RESTORING X-Git-Tag: consumer/spring-consumer--log-compaction--2024-11-13--si~9 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d693223ffb3afc3d3c654278f142616de991bee8;p=demos%2Fkafka%2Ftraining DRY für state-change zu RESTORING --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index d852f41..84e5057 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -297,20 +297,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { log.info("{} - Adding partition {}", id, partition); assignedPartitions.add(partition); - - phaser.register(); - log.info( - "{} - Registered new party for restored assigned partition {}. New total number of parties: {}", - id, - partition, - phaser.getRegisteredParties()); - - log.info( - "{} - Changing partition-state for {}: {} -> RESTORING", - id, - partition, - partitionStates[partition.partition()]); - partitionStates[partition.partition()] = PartitionState.RESTORING; + stateRestoring(partition.partition()); } else { @@ -375,6 +362,23 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener }); } + private void stateRestoring(int partition) + { + log.info( + "{} - Changing partition-state for {}: {} -> RESTORING", + id, + partition, + partitionStates[partition]); + partitionStates[partition] = PartitionState.RESTORING; + + phaser.register(); + log.info( + "{} - Registered new party for newly assigned partition {}. New total number of parties: {}", + id, + partition, + phaser.getRegisteredParties()); + } + private void stateAssigned(int partition) { log.info(