From: Kai Moritz Date: Mon, 28 Oct 2024 10:16:12 +0000 (+0100) Subject: Fix: `poll()` liefert nicht immer Nachrichten zu _allen_ Partitionen X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=4b1317c735cf067f8aeed41a9f746b9c6a5b73fa;p=demos%2Fkafka%2Ftraining Fix: `poll()` liefert nicht immer Nachrichten zu _allen_ Partitionen * Ein Aufruf von `poll()` liefert _nicht unbedingt_ Nachrichten zu _jeder_ Partition, die der Instanz gerade zugeteilt ist. * Daher konnte es auftreten, dass eine Phase nie beendet wurde, wenn `poll()` nur Nachrichten zu einer Untermenge der aktiven Partitionen geliefert hat. --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index e96bd7a..b9d8b27 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -11,10 +11,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import java.util.concurrent.Phaser; @@ -33,6 +30,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private volatile boolean running = false; private final Phaser phaser = new Phaser(1); + private final Set assignedPartitions = new HashSet<>(); private volatile int[] seen; private volatile int[] acked; private volatile boolean[] done; @@ -84,8 +82,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener int phase = phaser.getPhase(); log.info("{} - Received {} messages in phase {}", id, records.count(), phase); - records - .partitions() + assignedPartitions .forEach(partition -> { seen[partition.partition()] = 0; @@ -103,6 +100,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener } done[partition.partition()] = true; + if (seen[partition.partition()] == 0) + { + int arrivedPhase = phaser.arrive(); + log.debug("{} - Received no records for partition {} in phase {}", id, partition, arrivedPhase); + } }); int arrivedPhase = phaser.arriveAndAwaitAdvance(); @@ -232,6 +234,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener @Override public void onPartitionsAssigned(Collection partitions) { + assignedPartitions.addAll(partitions); phaser.bulkRegister(partitions.size()); log.info( "{} - Added {} parties for newly assigned partitions. New total number of parties: {}", @@ -243,6 +246,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener @Override public void onPartitionsRevoked(Collection partitions) { + assignedPartitions.removeAll(partitions); partitions.forEach(partition -> phaser.arriveAndDeregister()); log.info( "{} - Removed {} parties for revoked partitions. New total number of parties: {}",