Log-Compaction: Der Zustand wird in einem Topic gespeichert consumer/spring-consumer--log-compaction consumer/spring-consumer--log-compaction--2025-03-18--19-42 consumer/spring-consumer--log-compaction--2025-03-signal consumer/spring-consumer--log-compaction--2025-04-signal
authorKai Moritz <kai@juplo.de>
Fri, 21 Feb 2025 10:41:54 +0000 (11:41 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 15 Mar 2025 18:19:51 +0000 (19:19 +0100)
commitc9530eaa16e300c5ed812b72cfaa86e816c75824
tree2e091752890e157fd141fe1a0678498b704c5a1c
parent294478c3c3d86ae8ff7246334790f616032f1559
Log-Compaction: Der Zustand wird in einem Topic gespeichert

* Die Counter werden mit dem ``String``-Key indiziert
** Vorbereitung auf eine Poison-Pill Übung ist hier noch out-of-scope...
* GET-Endpoint zum Abfragen der Schlüssel-Zählungen ergänzt
** Zugriff auf die `counterMap` in `ExampleConsumer` synchronisiert.
** `CounterStateController` kopiert die Map, um mögliche konkurierende
   Zugriffe während des Erzeugens der Ausgabe zu vermeiden.
* Der Zustand des Zählers wird in einem compacted Topic abgelegt
** Der Consumer zählt, welche Nachrichten gesendet und welche bestätigt
   wurden.
** Über einen `Phaser` wird sichergestellt, dass alle Nachrichten von den
   zuständigen Brokern bestätigt wurden, bevor der nächste ``poll()``-Aufruf
   erfolgt.
* Der Value-Typ in dem Topic `state` ist jetzt auch vom Typ `String`
** Dadurch wird die Kontrolle der Ergebnisse einfacher, da alle Nachrichten
   auch einfach mit `kafkacat` gelesen werden können.
* Fix: Fehler müssen wie bestätigte Nachrichten behandelt werden
** Ohne explizite Fehlerbehandlung müssen auch die nicht bestätigten
   Nachrichten als `acked` gezählt werden.
** Ansonsten würde die Verarbeitung in einem ``poll()``-Durchlauf mit Fehler
   hängen bleiben, da niemals alles "gesehenen" Nachrichten auch als
   "bestätigt" gezählt würden.
** Dabei: Producer-Code an den aus `producer/spring-producer` angeglichen.
* Log-Meldungen zum Fortschritt beim Versenden des Zähler-Status ergänzt
* Log-Meldungen für das Senden des Zählerstands ergänzt
* Fix: Der Rebalance-Listener wurde nie registriert
* Fehler im Logging der aktiven Phase korrigiert und Meldungen verbessert
* Fix: Nachrichten wurden ggf. doppelt verarbeitet
** Wenn man in einer Schliefe die Nachrichten pro Partition separat
   verarbeitet...
** ...dann sollte man in jedem Schleifendurchlauf auch nur die Nachrichten
*   der gerade zu verarbeitenden Partition abrufen!
* Fix: `poll()` liefert nicht immer Nachrichten zu _allen_ Partitionen
** Ein Aufruf von `poll()` liefert _nicht unbedingt_ Nachrichten zu _jeder_
   Partition, die der Instanz gerade zugeteilt ist.
** Daher konnte es auftreten, dass eine Phase nie beendet wurde, wenn
   `poll()` nur Nachrichten zu einer Untermenge der aktiven Partitionen
   geliefert hat.
* Der Zustand wird aus dem ``state``-Topic wiederhergestellt
* Refactor: Logik für Counter in Klasse `CounterState` extrahiert
* Refactor: DRY für state-change zu ASSIGNED
* Refactor: DRY für state-change zu UNASSIGNED
* Refactor: Neue, klarere ``switch``-Syntax
* DRY für state-change zu RESTORING
* Refactor: Handling von pause/resume vollständig in State-Change-Methoden
* Fix & Refactor: Restore-Behandlung wurde _allen_ aktiven Partitionen zuteil
** Durch das vorausgehende Refactoring wurde deutlich, dass die Behandlung,
   die den _neu_ hinzugefügten Partitionen zugedacht war, allen in
   `assignedPartitions` vermerkten Partitionen wiederfahren ist.
** Dies ist für den aktuellen Entwicklungsstand ggf. egal, da der wegen dem
   Co-Partitioning (noch!) benötigte `RangeAssignor` eh _zuerst alle_
   Partitionen entzieht, bevor er _dann alle_ neu zuteilt.
** Da der Code aber auch mit dem neuen Consumer-Rebalance Protokoll
   funktionieren muss, wurde das Refactoring hier fortgeführt und so
   vollendet, dass nun _alle_ Aktionenen _nur noch_ von den Callbacks
   `onPartitionsAssigned()` und `onPartitionsRevoked()` ausgeht.
* Der Zählerzustand wird separat pro Partition verwaltet
** Dadurch ist es möglich, den Zustand für entzogene Partitionen zu löschen.
** D.h., bei der Ausgabe ist immer klar ersichtlich, über welchen Zustand
   die angefragte Instanz gerade verfügt.
* Refactor: Zustand muss `CounterState` vollständig übergeben werden
* Refactor: Enum `PartitionState` in `State` umbenannt
* Effekte des Log-Compaction in dem Topic `state` sichtbar gemacht
* Setup mit ein bischen mehr Dampf (`README.sh` angepasst)
* TX-kompatibler Weg zur Prüfung auf eine abgeschlossene Wiederherstellung
** Der bisher verwendete Vergleich der Offset-Positionen schlägt fehl, wenn
   die Implementierung um Transaktionen erweitert wird
** _Grund:_ Dann stimmt die Offset-Position nicht mehr überein, weil nach
   der letzten Zustands-Nachricht noch eine, von der Transaktion erzeugte,
   versteckte Nachricht folgt, die die Anwendung nie zu sehen bekommt!
* Mögliche Exception wegen konkurrierendem Zugriff auf Map verhindert
* Rückbau auf einfachen Consumer mit Statistiken zur Nachrichtenzählung,
  um an dem Beispiel die Verwendung des Rebalance-Listeners vorführen zu
  können. (D.h., das Speichern/Wiederherstellen des Zustands wurde
  entfernt.)
* Umkehrung des Rückbaus, so dass der Zustand wieder in einem Topic mit
  Log-Compaction gespeichert und aus diesem Wiederhergestellt wird.
* Die bisherige Implementierung notdürftig an die Typisierung angeapsst.
README.sh
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java