From 088f8dd11ec718a9c2010cb764938913d54fe3ee Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 10 Jun 2026 17:06:06 +0200 Subject: [PATCH] Version des Consumers, die bei Last an Dauer des Poll-Loop scheitert MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der Consumer legt bei Nachrichten, deren Nachrichten-Inhalt (als Zahl betrachtet) modulo 7 glatt aufgeht, eine zufällige Schaffens-Pause von 0 - 2000 ms ein. * Außerdem wurde `max.poll.interval.ms` auf 5 s heruntergesetzt und `max.poll.records` auf 30 Nachrichten. * Die Werte wurden über fleißiges ausprobieren so abgestimmt, dass dieser Consumer regelmäßig daran scheitert, alle Nachrichten innerhalb der vorgegebenen 5000 ms zu verarbeiten, so dass man in den Logs sehr gut den asynchron durch den Backrgound-Thread erzeugten Leave-Group Request erkennen kann, der bereits erfolgt, _bevor_ der Consumer alle Nachrichten verarbeitet hat, erneut `poll()` aufruft und _erst dann_ erfährt, dass ihm die Partitionen inzwischen entzogen wurden. * Um dies noch besser erkennen zu können, wurden außerdem Log-Meldungen vor und nach der Schaffens-Pause und - insbesondere - nach der Verarbeitung aller erhaltenen Nachrichten eingefügt. * *BEACHTE:* Der Consumer scheitert nur, wenn sich zuvor ein gewisser Rückstau gebildet hat. Solange er in "Echtzeit" eine Nachricht pro Sekunde erhält, hat er natürlich keine Probleme. * Um den Fehler auszulösen, muss der (ansonsten nicht angepasste) Producer also mind. 30 Sekunden gelaufen sein, ohne dass ein anderer Consumer die Nachrichten konsumiert hat. --- src/main/java/de/juplo/kafka/ExampleConsumer.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 36ddc70d..a6bc1dd5 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -38,6 +38,8 @@ public class ExampleConsumer implements ConsumerRebalanceListener props.put("client.id", clientId); // Nur zur Wiedererkennung props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("max.poll.interval.ms", 5000); + props.put("max.poll.records", 30); this.id = clientId; this.topic = topic; @@ -60,6 +62,14 @@ public class ExampleConsumer implements ConsumerRebalanceListener log.info("{} - Received {} messages", id, records.count()); for (ConsumerRecord record : records) { + long i = Long.parseLong(record.value()); + if (i % 7 == 0) + { + long ms = (long) (Math.random() * 2000); + log.info("{} - Sleeping for {} ms", id, ms); + Thread.sleep(ms); + log.info("{} - Awakening...", id); + } handleRecord( record.topic(), record.partition(), @@ -67,6 +77,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener record.key(), record.value()); } + log.info("{} - All {} messages consumed!", id, records.count()); } } catch(WakeupException e) -- 2.39.5