`commitAsync()` in `onPartitionsRevoked()`
authorKai Moritz <kai@juplo.de>
Tue, 23 Aug 2022 15:55:14 +0000 (17:55 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 26 Aug 2022 10:52:44 +0000 (12:52 +0200)
commit12c1ce703ef76b75d995f94d1689c894dde1406a
treed177abec262d45592c126530e705b6489cc2979e
parent3fcb695db13281056c541a10a08abff4d162495c
`commitAsync()` in `onPartitionsRevoked()`

* Der Rebalance-Listener führt jetzt einen (zusätzlichen) Commit der
  Offsets durch.
* Ohne diesen Commit, kann es bei sehr hohem Nachrichten-Aufkommen dazu
  kommen, dass nicht die letztendliche Offset-Position gespeichert wird.
* Da jetzt implizit ein Commit in `onPartitionsRevoked()` durchgeführt
  wird, ist kein expliziter Commit mehr nötig, wenn der Consumer durch
  eine Exception unterbrochen wird, bei der sichergestellt ist, dass die
  zuletzt durch `poll()` gelieferten Nachrichten vollständig verarbeitet
  wurden.
** Bei einer `WakeupException` ist dies klar, da diese in `poll()`
   geworfen wurde, also _nachdem_ die Implementierung durch den Aufruf
   von `poll()` signalisiert hat, dass sie alle zuvor gelieferten
   Nachrichten vollständig verarbeitet hat.
** Bei einer `RecordDeserializationException` ist dies klar, da ein
   Fehler während der Deserialisierung der vom Broker empfangenen
   Nachrichten dazu führt, dass die Kafka-Client-Library die zuvor
   fehlerfrei deserialisierten Nachrichten ausliefert und dies auch
   entsprechend in den intern mitgeführten Offset-Positionen reflektiert.
* Der Commit wird hier asynchron durchgeführt.
* TODO: Das führt dazu, dass die Implementierung in dem Rebalance
  einfriert, da der Callback, auf den sie wartet, dort nie aufgerufen
  wird, da die Commit-Callbacks nur "synchron" abgearbeitet werden, wenn
  die `poll()`-Methode aufgerufen wird!
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/EndlessConsumer.java