* Der Consumer legt bei Nachrichten, deren Nachrichten-Inhalt (als Zahl
betrachtet) modulo 7 glatt aufgeht, eine zufällige Schaffens-Pause von
0 - 2000 ms ein.
* Außerdem wurde `max.poll.interval.ms` auf 5 s heruntergesetzt und
`max.poll.records` auf 30 Nachrichten.
* Die Werte wurden über fleißiges ausprobieren so abgestimmt, dass dieser
Consumer regelmäßig daran scheitert, alle Nachrichten innerhalb der
vorgegebenen 5000 ms zu verarbeiten, so dass man in den Logs sehr gut
den asynchron durch den Backrgound-Thread erzeugten Leave-Group Request
erkennen kann, der bereits erfolgt, _bevor_ der Consumer alle Nachrichten
verarbeitet hat, erneut `poll()` aufruft und _erst dann_ erfährt, dass
ihm die Partitionen inzwischen entzogen wurden.
* Um dies noch besser erkennen zu können, wurden außerdem Log-Meldungen vor
und nach der Schaffens-Pause und - insbesondere - nach der Verarbeitung
aller erhaltenen Nachrichten eingefügt.
* *BEACHTE:* Der Consumer scheitert nur, wenn sich zuvor ein gewisser
Rückstau gebildet hat. Solange er in "Echtzeit" eine Nachricht pro Sekunde
erhält, hat er natürlich keine Probleme.
* Um den Fehler auszulösen, muss der (ansonsten nicht angepasste) Producer
also mind. 30 Sekunden gelaufen sein, ohne dass ein anderer Consumer die
Nachrichten konsumiert hat.
props.put("client.id", clientId); // Nur zur Wiedererkennung
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
+ props.put("max.poll.interval.ms", 5000);
+ props.put("max.poll.records", 30);
this.id = clientId;
this.topic = topic;
log.info("{} - Received {} messages", id, records.count());
for (ConsumerRecord<String, String> record : records)
{
+ long i = Long.parseLong(record.value());
+ if (i % 7 == 0)
+ {
+ long ms = (long) (Math.random() * 2000);
+ log.info("{} - Sleeping for {} ms", id, ms);
+ Thread.sleep(ms);
+ log.info("{} - Awakening...", id);
+ }
handleRecord(
record.topic(),
record.partition(),
record.key(),
record.value());
}
+ log.info("{} - All {} messages consumed!", id, records.count());
}
}
catch(WakeupException e)