Fehler im Logging der aktiven Phase korrigiert und Meldungen verbessert
authorKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 10:13:05 +0000 (11:13 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 08:43:13 +0000 (09:43 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 2ad0a93..d845138 100644 (file)
@@ -77,7 +77,9 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
         ConsumerRecords<String, String> records =
             consumer.poll(Duration.ofSeconds(1));
 
-        log.info("{} - Received {} messages", id, records.count());
+        int phase = phaser.getPhase();
+
+        log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
         records
           .partitions()
           .forEach(partition ->
@@ -99,8 +101,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
             done[partition.partition()] = true;
           });
 
-        int phase = phaser.arriveAndAwaitAdvance();
-        log.info("{} - Phase {} is done!", id, phase);
+        int arrivedPhase = phaser.arriveAndAwaitAdvance();
+        log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase);
       }
     }
     catch(WakeupException e)