From f8b7bbf7df856a257e545c01b0dffd7b87888fae Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 13:58:35 +0100 Subject: [PATCH] =?utf8?q?Refactor:=20Handling=20von=20pause/resume=20voll?= =?utf8?q?st=C3=A4ndig=20in=20State-Change-Methoden?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ExampleConsumer.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 84e5057..4177d76 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -297,7 +297,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { log.info("{} - Adding partition {}", id, partition); assignedPartitions.add(partition); - stateRestoring(partition.partition()); } else { @@ -313,26 +312,15 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener assignedPartitions.forEach(messagePartition -> { int partition = messagePartition.partition(); - TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); + TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); long stateBeginningOffset = consumer .beginningOffsets(List.of(statePartition)) .get(statePartition); if (stateBeginningOffset < stateEndOffsets[partition]) { - log.info( - "{} - Seeking to first offset {} for state partition {}", - id, - stateBeginningOffset, - statePartition); - consumer.seek(statePartition, stateBeginningOffset); - - log.info("{} - Pausing message partition {}", id, messagePartition); - consumer.pause(List.of(messagePartition)); - - log.info("{} - Resuming state partition {}", id, statePartition); - consumer.resume(List.of(statePartition)); + stateRestoring(partition, stateBeginningOffset); } else { @@ -362,7 +350,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener }); } - private void stateRestoring(int partition) + private void stateRestoring(int partition, long stateBeginningOffset) { log.info( "{} - Changing partition-state for {}: {} -> RESTORING", @@ -377,6 +365,20 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener id, partition, phaser.getRegisteredParties()); + + TopicPartition messagePartition = new TopicPartition(this.topic, partition); + log.info("{} - Pausing message partition {}", id, messagePartition); + consumer.pause(List.of(messagePartition)); + + TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); + log.info( + "{} - Seeking to first offset {} for state partition {}", + id, + stateBeginningOffset, + statePartition); + consumer.seek(statePartition, stateBeginningOffset); + log.info("{} - Resuming state partition {}", id, statePartition); + consumer.resume(List.of(statePartition)); } private void stateAssigned(int partition) -- 2.20.1