Fix: `poll()` liefert nicht immer Nachrichten zu _allen_ Partitionen
authorKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 10:16:12 +0000 (11:16 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 08:43:13 +0000 (09:43 +0100)
* Ein Aufruf von `poll()` liefert _nicht unbedingt_ Nachrichten zu _jeder_
  Partition, die der Instanz gerade zugeteilt ist.
* Daher konnte es auftreten, dass eine Phase nie beendet wurde, wenn
  `poll()` nur Nachrichten zu einer Untermenge der aktiven Partitionen
  geliefert hat.

src/main/java/de/juplo/kafka/ExampleConsumer.java

index b5845ca..9485110 100644 (file)
@@ -11,10 +11,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.Phaser;
 
 
@@ -32,6 +29,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
   private volatile boolean running = false;
   private final Phaser phaser = new Phaser(1);
+  private final Set<TopicPartition> assignedPartitions = new HashSet<>();
   private volatile int[] seen;
   private volatile int[] acked;
   private volatile boolean[] done;
@@ -80,8 +78,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
         int phase = phaser.getPhase();
 
         log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
-        records
-          .partitions()
+        assignedPartitions
           .forEach(partition ->
           {
             seen[partition.partition()] = 0;
@@ -99,6 +96,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
             }
 
             done[partition.partition()] = true;
+            if (seen[partition.partition()] == 0)
+            {
+              int arrivedPhase = phaser.arrive();
+              log.debug("{} - Received no records for partition {} in phase {}", id, partition, arrivedPhase);
+            }
           });
 
         int arrivedPhase = phaser.arriveAndAwaitAdvance();
@@ -224,6 +226,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
+    assignedPartitions.addAll(partitions);
     phaser.bulkRegister(partitions.size());
     log.info(
       "{} - Added {} parties for newly assigned partitions. New total number of parties: {}",
@@ -235,6 +238,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
+    assignedPartitions.removeAll(partitions);
     partitions.forEach(partition -> phaser.arriveAndDeregister());
     log.info(
       "{} - Removed {} parties for revoked partitions. New total number of parties: {}",