Refactor: Handling von pause/resume vollständig in State-Change-Methoden
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 12:58:35 +0000 (13:58 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:27:08 +0000 (14:27 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 84e5057..4177d76 100644 (file)
@@ -297,7 +297,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       {
         log.info("{} - Adding partition {}", id, partition);
         assignedPartitions.add(partition);
-        stateRestoring(partition.partition());
       }
       else
       {
@@ -313,26 +312,15 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     assignedPartitions.forEach(messagePartition ->
     {
       int partition = messagePartition.partition();
-      TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
 
+      TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
       long stateBeginningOffset = consumer
         .beginningOffsets(List.of(statePartition))
         .get(statePartition);
 
       if (stateBeginningOffset < stateEndOffsets[partition])
       {
-        log.info(
-          "{} - Seeking to first offset {} for state partition {}",
-          id,
-          stateBeginningOffset,
-          statePartition);
-        consumer.seek(statePartition, stateBeginningOffset);
-
-        log.info("{} - Pausing message partition {}", id, messagePartition);
-        consumer.pause(List.of(messagePartition));
-
-        log.info("{} - Resuming state partition {}", id, statePartition);
-        consumer.resume(List.of(statePartition));
+        stateRestoring(partition, stateBeginningOffset);
       }
       else
       {
@@ -362,7 +350,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     });
   }
 
-  private void stateRestoring(int partition)
+  private void stateRestoring(int partition, long stateBeginningOffset)
   {
     log.info(
       "{} - Changing partition-state for {}: {} -> RESTORING",
@@ -377,6 +365,20 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       id,
       partition,
       phaser.getRegisteredParties());
+
+    TopicPartition messagePartition = new TopicPartition(this.topic, partition);
+    log.info("{} - Pausing message partition {}", id, messagePartition);
+    consumer.pause(List.of(messagePartition));
+
+    TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
+    log.info(
+      "{} - Seeking to first offset {} for state partition {}",
+      id,
+      stateBeginningOffset,
+      statePartition);
+    consumer.seek(statePartition, stateBeginningOffset);
+    log.info("{} - Resuming state partition {}", id, statePartition);
+    consumer.resume(List.of(statePartition));
   }
 
   private void stateAssigned(int partition)