Fix: `poll()` liefert nicht immer Nachrichten zu _allen_ Partitionen
authorKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 10:16:12 +0000 (11:16 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:27:08 +0000 (14:27 +0100)
* Ein Aufruf von `poll()` liefert _nicht unbedingt_ Nachrichten zu _jeder_
  Partition, die der Instanz gerade zugeteilt ist.
* Daher konnte es auftreten, dass eine Phase nie beendet wurde, wenn
  `poll()` nur Nachrichten zu einer Untermenge der aktiven Partitionen
  geliefert hat.

src/main/java/de/juplo/kafka/ExampleConsumer.java

index 7a4b5d1..77b58ea 100644 (file)
@@ -11,10 +11,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.Phaser;
 
 
@@ -33,6 +30,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
   private volatile boolean running = false;
   private final Phaser phaser = new Phaser(1);
+  private final Set<TopicPartition> assignedPartitions = new HashSet<>();
   private volatile int[] seen;
   private volatile int[] acked;
   private volatile boolean[] done;
@@ -84,8 +82,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
         int phase = phaser.getPhase();
 
         log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
-        records
-          .partitions()
+        assignedPartitions
           .forEach(partition ->
           {
             seen[partition.partition()] = 0;
@@ -103,6 +100,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
             }
 
             done[partition.partition()] = true;
+            if (seen[partition.partition()] == 0)
+            {
+              int arrivedPhase = phaser.arrive();
+              log.debug("{} - Received no records for partition {} in phase {}", id, partition, arrivedPhase);
+            }
           });
 
         int arrivedPhase = phaser.arriveAndAwaitAdvance();
@@ -232,6 +234,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
+    assignedPartitions.addAll(partitions);
     phaser.bulkRegister(partitions.size());
     log.info(
       "{} - Added {} parties for newly assigned partitions. New total number of parties: {}",
@@ -243,6 +246,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
+    assignedPartitions.removeAll(partitions);
     partitions.forEach(partition -> phaser.arriveAndDeregister());
     log.info(
       "{} - Removed {} parties for revoked partitions. New total number of parties: {}",