From 127d62c9bebc408da677d9c2ba3e7381cadb424f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 13:58:35 +0100 Subject: [PATCH] =?utf8?q?Refactor:=20Handling=20von=20pause/resume=20voll?= =?utf8?q?st=C3=A4ndig=20in=20State-Change-Methoden?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ExampleConsumer.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 3f00a08..0f09bda 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -289,7 +289,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { log.info("{} - Adding partition {}", id, partition); assignedPartitions.add(partition); - stateRestoring(partition.partition()); } else { @@ -305,26 +304,15 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener assignedPartitions.forEach(messagePartition -> { int partition = messagePartition.partition(); - TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); + TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); long stateBeginningOffset = consumer .beginningOffsets(List.of(statePartition)) .get(statePartition); if (stateBeginningOffset < stateEndOffsets[partition]) { - log.info( - "{} - Seeking to first offset {} for state partition {}", - id, - stateBeginningOffset, - statePartition); - consumer.seek(statePartition, stateBeginningOffset); - - log.info("{} - Pausing message partition {}", id, messagePartition); - consumer.pause(List.of(messagePartition)); - - log.info("{} - Resuming state partition {}", id, statePartition); - consumer.resume(List.of(statePartition)); + stateRestoring(partition, stateBeginningOffset); } else { @@ -354,7 +342,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener }); } - private void stateRestoring(int partition) + private void stateRestoring(int partition, long stateBeginningOffset) { log.info( "{} - Changing partition-state for {}: {} -> RESTORING", @@ -369,6 +357,20 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener id, partition, phaser.getRegisteredParties()); + + TopicPartition messagePartition = new TopicPartition(this.topic, partition); + log.info("{} - Pausing message partition {}", id, messagePartition); + consumer.pause(List.of(messagePartition)); + + TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); + log.info( + "{} - Seeking to first offset {} for state partition {}", + id, + stateBeginningOffset, + statePartition); + consumer.seek(statePartition, stateBeginningOffset); + log.info("{} - Resuming state partition {}", id, statePartition); + consumer.resume(List.of(statePartition)); } private void stateAssigned(int partition) -- 2.20.1