]> juplo.de Git - demos/kafka/training/commitdiff
Version des Consumers, die bei Last an Dauer des Poll-Loop scheitert consumer/simple-consumer--max-poll-interval-ms consumer/simple-consumer--max-poll-interval-ms--2026-06-lvm--rebase-vollständig
authorKai Moritz <kai@juplo.de>
Wed, 10 Jun 2026 15:06:06 +0000 (17:06 +0200)
committerKai Moritz <kai.milan.moritz@googlemail.com>
Fri, 12 Jun 2026 18:43:30 +0000 (18:43 +0000)
* Der Consumer legt bei Nachrichten, deren Nachrichten-Inhalt (als Zahl
  betrachtet) modulo 7 glatt aufgeht, eine zufällige Schaffens-Pause von
  0 - 2000 ms ein.
* Außerdem wurde `max.poll.interval.ms` auf 5 s heruntergesetzt und
  `max.poll.records` auf 30 Nachrichten.
* Die Werte wurden über fleißiges ausprobieren so abgestimmt, dass dieser
  Consumer regelmäßig daran scheitert, alle Nachrichten innerhalb der
  vorgegebenen 5000 ms zu verarbeiten, so dass man in den Logs sehr gut
  den asynchron durch den Backrgound-Thread erzeugten Leave-Group Request
  erkennen kann, der bereits erfolgt, _bevor_ der Consumer alle Nachrichten
  verarbeitet hat, erneut `poll()` aufruft und _erst dann_ erfährt, dass
  ihm die Partitionen inzwischen entzogen wurden.
* Um dies noch besser erkennen zu können, wurden außerdem Log-Meldungen vor
  und nach der Schaffens-Pause und - insbesondere - nach der Verarbeitung
  aller erhaltenen Nachrichten eingefügt.
* *BEACHTE:* Der Consumer scheitert nur, wenn sich zuvor ein gewisser
  Rückstau gebildet hat. Solange er in "Echtzeit" eine Nachricht pro Sekunde
  erhält, hat er natürlich keine Probleme.
* Um den Fehler auszulösen, muss der (ansonsten nicht angepasste) Producer
  also mind. 30 Sekunden gelaufen sein, ohne dass ein anderer Consumer die
  Nachrichten konsumiert hat.

src/main/java/de/juplo/kafka/ExampleConsumer.java

index 36ddc70d45eef8f2eae09a5c11fc53c1e3cbf713..a6bc1dd50b183add2280c7d7b5f7ce9a452c251d 100644 (file)
@@ -38,6 +38,8 @@ public class ExampleConsumer implements ConsumerRebalanceListener
     props.put("client.id", clientId); // Nur zur Wiedererkennung
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
+    props.put("max.poll.interval.ms", 5000);
+    props.put("max.poll.records", 30);
 
     this.id = clientId;
     this.topic = topic;
@@ -60,6 +62,14 @@ public class ExampleConsumer implements ConsumerRebalanceListener
         log.info("{} - Received {} messages", id, records.count());
         for (ConsumerRecord<String, String> record : records)
         {
+          long i = Long.parseLong(record.value());
+          if (i % 7 == 0)
+          {
+            long ms = (long) (Math.random() * 2000);
+            log.info("{} - Sleeping for {} ms", id, ms);
+            Thread.sleep(ms);
+            log.info("{} - Awakening...", id);
+          }
           handleRecord(
             record.topic(),
             record.partition(),
@@ -67,6 +77,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener
             record.key(),
             record.value());
         }
+        log.info("{} - All {} messages consumed!", id, records.count());
       }
     }
     catch(WakeupException e)