Fehler im Logging der aktiven Phase korrigiert und Meldungen verbessert
authorKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 10:13:05 +0000 (11:13 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:27:08 +0000 (14:27 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 3acaebe..385df67 100644 (file)
@@ -81,7 +81,9 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
         ConsumerRecords<String, String> records =
             consumer.poll(Duration.ofSeconds(1));
 
-        log.info("{} - Received {} messages", id, records.count());
+        int phase = phaser.getPhase();
+
+        log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
         records
           .partitions()
           .forEach(partition ->
@@ -103,8 +105,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
             done[partition.partition()] = true;
           });
 
-        int phase = phaser.arriveAndAwaitAdvance();
-        log.info("{} - Phase {} is done!", id, phase);
+        int arrivedPhase = phaser.arriveAndAwaitAdvance();
+        log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase);
       }
     }
     catch(WakeupException e)