Kai Moritz [Sun, 27 Oct 2024 11:03:06 +0000 (12:03 +0100)]
Rebalance-Listener: Consumer zählt die gesehenen Schlüssel
* Die Counter werden mit dem ``String``-Key indiziert
** Vorbereitung auf eine Poison-Pill Übung ist hier noch out-of-scope...
* GET-Endpoint zum Abfragen der Schlüssel-Zählungen ergänzt
** Zugriff auf die `counterMap` in `ExampleConsumer` synchronisiert.
** `CounterStateController` kopiert die Map, um mögliche konkurierende
Zugriffe während des Erzeugens der Ausgabe zu vermeiden.
* Der Zustand des Zählers wird in einem compacted Topic abgelegt
** Der Consumer zählt, welche Nachrichten gesendet und welche bestätigt
wurden.
** Über einen `Phaser` wird sichergestellt, dass alle Nachrichten von den
zuständigen Brokern bestätigt wurden, bevor der nächste ``poll()``-Aufruf
erfolgt.
* Der Value-Typ in dem Topic `state` ist jetzt auch vom Typ `String`
** Dadurch wird die Kontrolle der Ergebnisse einfacher, da alle Nachrichten
auch einfach mit `kafkacat` gelesen werden können.
* Fix: Fehler müssen wie bestätigte Nachrichten behandelt werden
** Ohne explizite Fehlerbehandlung müssen auch die nicht bestätigten
Nachrichten als `acked` gezählt werden.
** Ansonsten würde die Verarbeitung in einem ``poll()``-Durchlauf mit Fehler
hängen bleiben, da niemals alles "gesehenen" Nachrichten auch als
"bestätigt" gezählt würden.
** Dabei: Producer-Code an den aus `producer/spring-producer` angeglichen.
* Log-Meldungen zum Fortschritt beim Versenden des Zähler-Status ergänzt
* Log-Meldungen für das Senden des Zählerstands ergänzt
* Fix: Der Rebalance-Listener wurde nie registriert
* Fehler im Logging der aktiven Phase korrigiert und Meldungen verbessert
* Fix: Nachrichten wurden ggf. doppelt verarbeitet
** Wenn man in einer Schliefe die Nachrichten pro Partition separat
verarbeitet...
** ...dann sollte man in jedem Schleifendurchlauf auch nur die Nachrichten
* der gerade zu verarbeitenden Partition abrufen!
* Fix: `poll()` liefert nicht immer Nachrichten zu _allen_ Partitionen
** Ein Aufruf von `poll()` liefert _nicht unbedingt_ Nachrichten zu _jeder_
Partition, die der Instanz gerade zugeteilt ist.
** Daher konnte es auftreten, dass eine Phase nie beendet wurde, wenn
`poll()` nur Nachrichten zu einer Untermenge der aktiven Partitionen
geliefert hat.
* Der Zustand wird aus dem ``state``-Topic wiederhergestellt
* Refactor: Logik für Counter in Klasse `CounterState` extrahiert
* Refactor: DRY für state-change zu ASSIGNED
* Refactor: DRY für state-change zu UNASSIGNED
* Refactor: Neue, klarere ``switch``-Syntax
* DRY für state-change zu RESTORING
* Refactor: Handling von pause/resume vollständig in State-Change-Methoden
* Fix & Refactor: Restore-Behandlung wurde _allen_ aktiven Partitionen zuteil
** Durch das vorausgehende Refactoring wurde deutlich, dass die Behandlung,
die den _neu_ hinzugefügten Partitionen zugedacht war, allen in
`assignedPartitions` vermerkten Partitionen wiederfahren ist.
** Dies ist für den aktuellen Entwicklungsstand ggf. egal, da der wegen dem
Co-Partitioning (noch!) benötigte `RangeAssignor` eh _zuerst alle_
Partitionen entzieht, bevor er _dann alle_ neu zuteilt.
** Da der Code aber auch mit dem neuen Consumer-Rebalance Protokoll
funktionieren muss, wurde das Refactoring hier fortgeführt und so
vollendet, dass nun _alle_ Aktionenen _nur noch_ von den Callbacks
`onPartitionsAssigned()` und `onPartitionsRevoked()` ausgeht.
* Der Zählerzustand wird separat pro Partition verwaltet
** Dadurch ist es möglich, den Zustand für entzogene Partitionen zu löschen.
** D.h., bei der Ausgabe ist immer klar ersichtlich, über welchen Zustand
die angefragte Instanz gerade verfügt.
* Refactor: Zustand muss `CounterState` vollständig übergeben werden
* Refactor: Enum `PartitionState` in `State` umbenannt
* Effekte des Log-Compaction in dem Topic `state` sichtbar gemacht
* Setup mit ein bischen mehr Dampf (`README.sh` angepasst)
* TX-kompatibler Weg zur Prüfung auf eine abgeschlossene Wiederherstellung
** Der bisher verwendete Vergleich der Offset-Positionen schlägt fehl, wenn
die Implementierung um Transaktionen erweitert wird
** _Grund:_ Dann stimmt die Offset-Position nicht mehr überein, weil nach
der letzten Zustands-Nachricht noch eine, von der Transaktion erzeugte,
versteckte Nachricht folgt, die die Anwendung nie zu sehen bekommt!
* Mögliche Exception wegen konkurrierendem Zugriff auf Map verhindert
* Rückbau auf einfachen Consumer mit Statistiken zur Nachrichtenzählung
Kai Moritz [Sat, 12 Apr 2025 09:56:07 +0000 (11:56 +0200)]
Das Docker-Setup verwendet den `juplo/simple-producer:1.0-SNAPSHOT`
* Zu dem Zeitpunkt, zu dem der `juplo/simple-consumer:1.0-SNAPSHOT` in
dem Live-Coding in den `juplo/spring-consumer:1.1-SNAPSHOT` umgebaut
wird, existiert der `juplo/spring-producer:2.0-SNAPSHOT` noch nicht!
Kai Moritz [Sun, 27 Oct 2024 21:08:53 +0000 (22:08 +0100)]
`ExampleConsumer` in eine Spring-Boot App umgebaut (ohne Spring Kafka)
* Consumerspezifische Properties werden in eigener nested Class verwaltet
** Dadurch wird der Code übersichtlicher, wenn spätere Implementierungen
* _sowohl_ als Consumer, _als auch_ als Producer agieren!
* Fix: `close()` muss noch vom `ExampleConsumer` aufgerufen werden
** Der Aufruf von `close()` löst die Abmeldung der Instanz bei dem
* GroupCoordinator aus.
** Dieser Vorgang sollte noch unter der Kontrolle des Anwendungscodes
* erfolgen!
** Wenn die Methode erst von Spring aufgerufen wird, werden dann ggf. noch
* Seiteneffekte ausgelöst, die dann noch im Kontext der Instanz laufen,
* obwohl diese eigentlich schon beendet wurde!
* Ungefangene Exceptions im `ExampleConsumer` lösen das Beenden der App aus
* Das Docker-Setup verwendet den `spring-producer`
** Die Konfiguration wurde außerdem so überarbeitet, dass der Producer
mehr Nachrichten verschickt (ca. 10 Nachrichten pro Sekunde) und diese
in Batches à ca. 6 Nachrichten verpackt.
Kai Moritz [Sun, 11 Jun 2023 11:55:20 +0000 (13:55 +0200)]
Docker-Setup auf `bitnami/kafka:3.4` aktualisiert und vereinfacht
* Die Konfiguration musste an (undokumentierte?!) Änderungen in der
version 3.4 von `bitnami/kafka` angepasst werden.
* Die drei Broker spielen jetzt gleichzeitig Controller. D.h., der
Service `kafka-0`, der explizit Controller gespielt hat, fällt weg.
Kai Moritz [Thu, 8 Jun 2023 08:35:41 +0000 (10:35 +0200)]
Bedienbarkeit des Setups verbessert
* Setup starten mit `docker-compose up -t0 -d cli`
** Dabei wird _nicht_ automatisch das Topic `test` neu angelegt
** D.h., die Daten gehen nicht unbeabsichtigt verloren, wenn man mit
`up -d` prüft, ob noc alles läuft!
* Das Topic `test` kan mit `docker-compose restart -t0 setup` explizit
gelöscht und neu angelegt (aka geleert) werden.
Kai Moritz [Fri, 22 Jul 2022 18:04:07 +0000 (20:04 +0200)]
Upgrade von Spring Boot und den Confluent-Kafka-Images
* Upgrade der Kafk-Images von Confluent 7.0.2 auf 7.1.3
** Unterstützt Kafka 3.1.x (siehe https://docs.confluent.io/platform/current/installation/versions-interoperability.html[Versions-Matrix])
* Upgrade für Spring Boot von 2.6.5 auf 2.7.2
** Enthält Kafka: 3.1.1
** Enthält Spring Kafka: 2.8.8