From: Kai Moritz Date: Sat, 2 Nov 2024 12:58:35 +0000 (+0100) Subject: Refactor: Handling von pause/resume vollständig in State-Change-Methoden X-Git-Tag: consumer/spring-consumer--log-compaction--2024-11-13--si~8 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f8b7bbf7df856a257e545c01b0dffd7b87888fae;p=demos%2Fkafka%2Ftraining Refactor: Handling von pause/resume vollständig in State-Change-Methoden --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 84e5057..4177d76 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -297,7 +297,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { log.info("{} - Adding partition {}", id, partition); assignedPartitions.add(partition); - stateRestoring(partition.partition()); } else { @@ -313,26 +312,15 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener assignedPartitions.forEach(messagePartition -> { int partition = messagePartition.partition(); - TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); + TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); long stateBeginningOffset = consumer .beginningOffsets(List.of(statePartition)) .get(statePartition); if (stateBeginningOffset < stateEndOffsets[partition]) { - log.info( - "{} - Seeking to first offset {} for state partition {}", - id, - stateBeginningOffset, - statePartition); - consumer.seek(statePartition, stateBeginningOffset); - - log.info("{} - Pausing message partition {}", id, messagePartition); - consumer.pause(List.of(messagePartition)); - - log.info("{} - Resuming state partition {}", id, statePartition); - consumer.resume(List.of(statePartition)); + stateRestoring(partition, stateBeginningOffset); } else { @@ -362,7 +350,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener }); } - private void stateRestoring(int partition) + private void stateRestoring(int partition, long stateBeginningOffset) { log.info( "{} - Changing partition-state for {}: {} -> RESTORING", @@ -377,6 +365,20 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener id, partition, phaser.getRegisteredParties()); + + TopicPartition messagePartition = new TopicPartition(this.topic, partition); + log.info("{} - Pausing message partition {}", id, messagePartition); + consumer.pause(List.of(messagePartition)); + + TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); + log.info( + "{} - Seeking to first offset {} for state partition {}", + id, + stateBeginningOffset, + statePartition); + consumer.seek(statePartition, stateBeginningOffset); + log.info("{} - Resuming state partition {}", id, statePartition); + consumer.resume(List.of(statePartition)); } private void stateAssigned(int partition)