Fix & Refactor: Restore-Behandlung wurde _allen_ aktiven Partitionen zuteil
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 14:03:10 +0000 (15:03 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:27:08 +0000 (14:27 +0100)
* Durch das vorausgehende Refactoring wurde deutlich, dass die Behandlung,
  die den _neu_ hinzugefügten Partitionen zugedacht war, allen in
  `assignedPartitions` vermerkten Partitionen wiederfahren ist.
* Dies ist für den aktuellen Entwicklungsstand ggf. egal, da der wegen dem
  Co-Partitioning (noch!) benötigte `RangeAssignor` eh _zuerst alle_
  Partitionen entzieht, bevor er _dann alle_ neu zuteilt.
* Da der Code aber auch mit dem neuen Consumer-Rebalance Protokoll
  funktionieren muss, wurde das Refactoring hier fortgeführt und so
  vollendet, dass nun _alle_ Aktionenen _nur noch_ von den Callbacks
  `onPartitionsAssigned()` und `onPartitionsRevoked()` ausgeht.

src/main/java/de/juplo/kafka/ExampleConsumer.java

index 4177d76..b646f3c 100644 (file)
@@ -291,66 +291,63 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
-    partitions.forEach(partition ->
-    {
-      if (partition.topic().equals(topic))
-      {
-        log.info("{} - Adding partition {}", id, partition);
-        assignedPartitions.add(partition);
-      }
-      else
-      {
-        long endOffset = consumer
-          .endOffsets(List.of(partition))
-          .get(partition)
-          .longValue();
-        log.info("{} - Found end-offset {} for state partition {}", id, endOffset, partition);
-        stateEndOffsets[partition.partition()] = endOffset;
-      }
-    });
+    partitions
+      .stream()
+      .filter(partition -> partition.topic().equals(topic))
+      .forEach(partition -> restoreAndAssign(partition.partition()));
+  }
 
-    assignedPartitions.forEach(messagePartition ->
-    {
-      int partition = messagePartition.partition();
+  @Override
+  public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitions
+      .stream()
+      .filter(partition -> partition.topic().equals(topic))
+      .forEach(partition -> revoke(partition.partition()));
+  }
 
-      TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
-      long stateBeginningOffset = consumer
-        .beginningOffsets(List.of(statePartition))
-        .get(statePartition);
+  private void restoreAndAssign(int partition)
+  {
+    TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
 
-      if (stateBeginningOffset < stateEndOffsets[partition])
-      {
-        stateRestoring(partition, stateBeginningOffset);
-      }
-      else
-      {
-        log.info("{} - State is up-to-date for message partition {}", id, messagePartition);
-        stateAssigned(partition);
-      }
-    });
+    long stateEndOffset = consumer
+      .endOffsets(List.of(statePartition))
+      .get(statePartition)
+      .longValue();
+
+    long stateBeginningOffset = consumer
+      .beginningOffsets(List.of(statePartition))
+      .get(statePartition);
+
+    log.info(
+      "{} - Found beginning-offset {} and end-offset {} for state partition {}",
+      id,
+      stateBeginningOffset,
+      stateEndOffset,
+      partition);
+
+    if (stateBeginningOffset < stateEndOffset)
+    {
+      stateRestoring(partition, stateBeginningOffset, stateEndOffset);
+    }
+    else
+    {
+      log.info("{} - State is up-to-date for partition {}", id, partition);
+      stateAssigned(partition);
+    }
   }
 
-  @Override
-  public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  private void revoke(int partition)
   {
-    partitions.forEach(partition ->
+    PartitionState partitionState = partitionStates[partition];
+    switch (partitionState)
     {
-      if (partition.topic().equals(topic))
-      {
-        log.info("{} - Revoking partition {}", id, partition);
-        assignedPartitions.remove(partition);
-
-        PartitionState partitionState = partitionStates[partition.partition()];
-        switch (partitionState)
-        {
-          case RESTORING, ASSIGNED -> stateUnassigned(partition.partition());
-          case UNASSIGNED -> log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState);
-        }
-      }
-    });
+      case RESTORING, ASSIGNED -> stateUnassigned(partition);
+      case UNASSIGNED -> log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState);
+    }
   }
 
-  private void stateRestoring(int partition, long stateBeginningOffset)
+  private void stateRestoring(int partition, long stateBeginningOffset, long stateEndOffset)
   {
     log.info(
       "{} - Changing partition-state for {}: {} -> RESTORING",
@@ -359,13 +356,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       partitionStates[partition]);
     partitionStates[partition] = PartitionState.RESTORING;
 
-    phaser.register();
-    log.info(
-      "{} - Registered new party for newly assigned partition {}. New total number of parties: {}",
-      id,
-      partition,
-      phaser.getRegisteredParties());
-
     TopicPartition messagePartition = new TopicPartition(this.topic, partition);
     log.info("{} - Pausing message partition {}", id, messagePartition);
     consumer.pause(List.of(messagePartition));
@@ -377,6 +367,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       stateBeginningOffset,
       statePartition);
     consumer.seek(statePartition, stateBeginningOffset);
+    stateEndOffsets[partition] = stateEndOffset;
     log.info("{} - Resuming state partition {}", id, statePartition);
     consumer.resume(List.of(statePartition));
   }
@@ -396,26 +387,43 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     consumer.pause(List.of(statePartition));
 
     TopicPartition messagePartition = new TopicPartition(topic, partition);
+    log.info("{} - Adding partition {} to the assigned partitions", id, messagePartition);
+    assignedPartitions.add(messagePartition);
+    phaser.register();
+    log.info(
+      "{} - Registered new party for newly assigned partition {}. New total number of parties: {}",
+      id,
+      messagePartition,
+      phaser.getRegisteredParties());
     log.info("{} - Resuming message partition {}...", id, messagePartition);
     consumer.resume(List.of(messagePartition));
   }
 
   private void stateUnassigned(int partition)
   {
+    PartitionState oldPartitionState = partitionStates[partition];
+
     log.info(
       "{} - State-change for partition {}: {} -> UNASSIGNED",
       id,
       partition,
-      partitionStates[partition]);
+      oldPartitionState);
 
     partitionStates[partition] = PartitionState.UNASSIGNED;
 
-    phaser.arriveAndDeregister();
-    log.info(
-      "{} - Deregistered party for revoked partition {}. New total number of parties: {}",
-      id,
-      partition,
-      phaser.getRegisteredParties());
+    if (oldPartitionState == PartitionState.ASSIGNED)
+    {
+      TopicPartition messagePartition = new TopicPartition(topic, partition);
+      log.info("{} - Revoking partition {}", id, messagePartition);
+      assignedPartitions.remove(messagePartition);
+
+      phaser.arriveAndDeregister();
+      log.info(
+        "{} - Deregistered party for revoked partition {}. New total number of parties: {}",
+        id,
+        messagePartition,
+        phaser.getRegisteredParties());
+    }
   }