Rebalance-Listener: Consumer zählt die gesehenen Schlüssel consumer/spring-consumer--rebalance-listener--generics4all consumer/spring-consumer--rebalance-listener--generics4some
authorKai Moritz <kai@juplo.de>
Sun, 27 Oct 2024 11:03:06 +0000 (12:03 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 21 Feb 2025 10:58:17 +0000 (11:58 +0100)
commitefb2f90f62b1e5bcf726bb7015d7d778e30ef368
treed808ce8f22d6b87df4bc728776e164d953f3d84f
parent0f228ca3fc0291c6049d0ba2092a9512207b61c8
Rebalance-Listener: Consumer zählt die gesehenen Schlüssel

* Die Counter werden mit dem ``String``-Key indiziert
** Vorbereitung auf eine Poison-Pill Übung ist hier noch out-of-scope...
* GET-Endpoint zum Abfragen der Schlüssel-Zählungen ergänzt
** Zugriff auf die `counterMap` in `ExampleConsumer` synchronisiert.
** `CounterStateController` kopiert die Map, um mögliche konkurierende
   Zugriffe während des Erzeugens der Ausgabe zu vermeiden.
* Der Zustand des Zählers wird in einem compacted Topic abgelegt
** Der Consumer zählt, welche Nachrichten gesendet und welche bestätigt
   wurden.
** Über einen `Phaser` wird sichergestellt, dass alle Nachrichten von den
   zuständigen Brokern bestätigt wurden, bevor der nächste ``poll()``-Aufruf
   erfolgt.
* Der Value-Typ in dem Topic `state` ist jetzt auch vom Typ `String`
** Dadurch wird die Kontrolle der Ergebnisse einfacher, da alle Nachrichten
   auch einfach mit `kafkacat` gelesen werden können.
* Fix: Fehler müssen wie bestätigte Nachrichten behandelt werden
** Ohne explizite Fehlerbehandlung müssen auch die nicht bestätigten
   Nachrichten als `acked` gezählt werden.
** Ansonsten würde die Verarbeitung in einem ``poll()``-Durchlauf mit Fehler
   hängen bleiben, da niemals alles "gesehenen" Nachrichten auch als
   "bestätigt" gezählt würden.
** Dabei: Producer-Code an den aus `producer/spring-producer` angeglichen.
* Log-Meldungen zum Fortschritt beim Versenden des Zähler-Status ergänzt
* Log-Meldungen für das Senden des Zählerstands ergänzt
* Fix: Der Rebalance-Listener wurde nie registriert
* Fehler im Logging der aktiven Phase korrigiert und Meldungen verbessert
* Fix: Nachrichten wurden ggf. doppelt verarbeitet
** Wenn man in einer Schliefe die Nachrichten pro Partition separat
   verarbeitet...
** ...dann sollte man in jedem Schleifendurchlauf auch nur die Nachrichten
*   der gerade zu verarbeitenden Partition abrufen!
* Fix: `poll()` liefert nicht immer Nachrichten zu _allen_ Partitionen
** Ein Aufruf von `poll()` liefert _nicht unbedingt_ Nachrichten zu _jeder_
   Partition, die der Instanz gerade zugeteilt ist.
** Daher konnte es auftreten, dass eine Phase nie beendet wurde, wenn
   `poll()` nur Nachrichten zu einer Untermenge der aktiven Partitionen
   geliefert hat.
* Der Zustand wird aus dem ``state``-Topic wiederhergestellt
* Refactor: Logik für Counter in Klasse `CounterState` extrahiert
* Refactor: DRY für state-change zu ASSIGNED
* Refactor: DRY für state-change zu UNASSIGNED
* Refactor: Neue, klarere ``switch``-Syntax
* DRY für state-change zu RESTORING
* Refactor: Handling von pause/resume vollständig in State-Change-Methoden
* Fix & Refactor: Restore-Behandlung wurde _allen_ aktiven Partitionen zuteil
** Durch das vorausgehende Refactoring wurde deutlich, dass die Behandlung,
   die den _neu_ hinzugefügten Partitionen zugedacht war, allen in
   `assignedPartitions` vermerkten Partitionen wiederfahren ist.
** Dies ist für den aktuellen Entwicklungsstand ggf. egal, da der wegen dem
   Co-Partitioning (noch!) benötigte `RangeAssignor` eh _zuerst alle_
   Partitionen entzieht, bevor er _dann alle_ neu zuteilt.
** Da der Code aber auch mit dem neuen Consumer-Rebalance Protokoll
   funktionieren muss, wurde das Refactoring hier fortgeführt und so
   vollendet, dass nun _alle_ Aktionenen _nur noch_ von den Callbacks
   `onPartitionsAssigned()` und `onPartitionsRevoked()` ausgeht.
* Der Zählerzustand wird separat pro Partition verwaltet
** Dadurch ist es möglich, den Zustand für entzogene Partitionen zu löschen.
** D.h., bei der Ausgabe ist immer klar ersichtlich, über welchen Zustand
   die angefragte Instanz gerade verfügt.
* Refactor: Zustand muss `CounterState` vollständig übergeben werden
* Refactor: Enum `PartitionState` in `State` umbenannt
* Effekte des Log-Compaction in dem Topic `state` sichtbar gemacht
* Setup mit ein bischen mehr Dampf (`README.sh` angepasst)
* TX-kompatibler Weg zur Prüfung auf eine abgeschlossene Wiederherstellung
** Der bisher verwendete Vergleich der Offset-Positionen schlägt fehl, wenn
   die Implementierung um Transaktionen erweitert wird
** _Grund:_ Dann stimmt die Offset-Position nicht mehr überein, weil nach
   der letzten Zustands-Nachricht noch eine, von der Transaktion erzeugte,
   versteckte Nachricht folgt, die die Anwendung nie zu sehen bekommt!
* Mögliche Exception wegen konkurrierendem Zugriff auf Map verhindert
* Rückbau auf einfachen Consumer mit Statistiken zur Nachrichtenzählung
README.sh
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/CounterState.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/CounterStateController.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/test/java/de/juplo/kafka/ApplicationTests.java