Refactor: DRY für state-change zu ASSIGNED
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 10:21:28 +0000 (11:21 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:27:08 +0000 (14:27 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 0b57140..b22f5cc 100644 (file)
@@ -177,19 +177,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     counterState.setCounterState(key, Long.parseLong(value));
     if (offset + 1 == stateEndOffsets[partition])
     {
-      log.info(
-        "{} - Restoring of state for partition {} done! New partition-state is ASSIGNED!", id, partition);
-      partitionStates[partition] = PartitionState.ASSIGNED;
-
-      TopicPartition tp;
-
-      tp = new TopicPartition(stateTopic, partition);
-      log.info("{} - Pausing state partition {}...", id, tp);
-      consumer.pause(List.of(tp));
-
-      tp = new TopicPartition(topic, partition);
-      log.info("{} - Resuming message partition {}...", id, tp);
-      consumer.resume(List.of(tp));
+      log.info("{} - Restoring of state for partition {} done!", id, partition);
+      stateAssigned(partition);
     }
     else
     {
@@ -360,17 +349,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       }
       else
       {
-        log.info(
-          "{} - State is up-to-date for message partition {}. New partition-state is ASSIGNED",
-          id,
-          messagePartition);
-        partitionStates[partition] = PartitionState.ASSIGNED;
-
-        log.info("{} - Pausing state partition {}...", id, statePartition);
-        consumer.pause(List.of(statePartition));
-
-        log.info("{} - Resuming message partition {}...", id, messagePartition);
-        consumer.resume(List.of(messagePartition));
+        log.info("{} - State is up-to-date for message partition {}", id, messagePartition);
+        stateAssigned(partition);
       }
     });
   }
@@ -406,6 +386,25 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     });
   }
 
+  private void stateAssigned(int partition)
+  {
+    log.info(
+      "{} - State-change for partition {}: {} -> ASSIGNED",
+      id,
+      partition,
+      partitionStates[partition]);
+
+    partitionStates[partition] = PartitionState.ASSIGNED;
+
+    TopicPartition statePartition = new TopicPartition(stateTopic, partition);
+    log.info("{} - Pausing state partition {}...", id, statePartition);
+    consumer.pause(List.of(statePartition));
+
+    TopicPartition messagePartition = new TopicPartition(topic, partition);
+    log.info("{} - Resuming message partition {}...", id, messagePartition);
+    consumer.resume(List.of(messagePartition));
+  }
+
   public void shutdown() throws InterruptedException
   {
     log.info("{} joining the worker-thread...", id);