summary |
shortlog | log |
commit |
commitdiff |
tree
first ⋅ prev ⋅ next
Kai Moritz [Fri, 15 Apr 2022 12:01:07 +0000 (14:01 +0200)]
Springify: ROT - Merge des verschärften Tests aus der Vanilla-Version
* Ohne die Verschärfung des Tests war der Test grün, obwohl die Anwendung
entgegen den Erwartungen trotz der eingetreuten Poison-Pill _alle_
Nachrichten gelesen hat.
* Jetzt schlägt der Test wie erwartet fehl.
Kai Moritz [Fri, 15 Apr 2022 09:51:08 +0000 (11:51 +0200)]
Springify: GRÜN - `start()`/`stop()` werden im Test explizit aufgerufen
Kai Moritz [Fri, 15 Apr 2022 09:30:56 +0000 (11:30 +0200)]
Springify: Die `@PreDestroy`-Methode wird nicht benötigt
* Spring Kafka prüft schon zuvor selbst, ob der Container noch läuft und
fährt ihn ggf. herunter.
Kai Moritz [Fri, 15 Apr 2022 09:29:48 +0000 (11:29 +0200)]
Springify: Start/Stop prüft, ob der Container schon/noch läuft
Kai Moritz [Fri, 15 Apr 2022 08:56:40 +0000 (10:56 +0200)]
Springify: `start()`/`stop()`/`destroy()` in EndlessConsumer wiederbelebt
* Die Logik für Start/Stop liegt jetzt wieder wie vor der Umstellung in
dem EndlessConsumer
* Der `ApplicationRunner` kenn den `EndlessConsumer` und ruft für diese
`start()` auf
* Entsprechende Aufrufe im `DriverController` wiederbelebt.
* Das Stoppen beim Herunterfahren der App wird wieder über eine
`@PreDestroy`-Methode im `EndlessConsumer` realisiert.
Kai Moritz [Fri, 15 Apr 2022 08:39:17 +0000 (10:39 +0200)]
Springify: ROT - Auto Startup in @KafkaListener deaktiviert
* Ziel, näher an die Funktion des Vanilla-EndlessConsumer heranrücken.
Der Testfall soll möglichst unverändert funktionieren.
* Unklar, ob das für die Schulung hilfreich ist.
* Hilft aber definitiv beim Verstehen der Mechanismen von Spring Kafka
* Hier wurde jetzt erst mal der automatische Start des Containers
unterbunden und stattdessen `Application` wieder zu einem
`ApplicationRunner` mit `@PreDestroy`-Methode gemacht.
* TODO: Testfall ist darüber erst mal ROT, da der Container dort jetzt
nicht gestartet wird! Das war ja aber genau das Ziel, denn bei den
übernommenen Testfällen gab es (zumindest theoretisch) Race-Conditions,
weil der Container - anders als bei der Vanilla-Implementierung - immer
schon lief.
Kai Moritz [Fri, 15 Apr 2022 10:22:57 +0000 (12:22 +0200)]
Tests: Refaktorisiert - Nachrichten werden für alle Tests aufgezeichnet
Kai Moritz [Fri, 15 Apr 2022 10:17:23 +0000 (12:17 +0200)]
Tests: Fehlerfall-Test prüft, dass nicht alle Nachrichten gelesen wurden
Kai Moritz [Fri, 15 Apr 2022 10:08:50 +0000 (12:08 +0200)]
Tests: Assert-Beschreibungen korrigiert/ergänzt
Kai Moritz [Thu, 14 Apr 2022 16:56:04 +0000 (18:56 +0200)]
Springify: BatchListener konfiguriert - Hilft nicht wirklich...
* Unerwartete aber eigentlich logische Folge:
** Es werden weiterhing alle 100 Nachrichten abgeholt
** Der Testfall schlägt trotzdem nicht fehl, da die Erwartung an die
gespeicherten Offsets dynamisch aus den tatsächlich vom Listener
bezogenen Nachrichten bestimmt wird.
* Der Effekt tritt ein, weil der `ErrorHandlingDeserializer` ja die
verursachende Exception schluckt und dafür eine Behandlung des Fehlers
vornimmt, die den Fehler für die nachfolgenden Komponenten transparenter
machen soll.
* Es ist aber eher das Gegenteil der Fall, da die Folge ist, dass der
`poll()`-Aufruf alle Nachrichten - _auch die nach dem Fehler!_ - abholt.
* D.h., es ist nicht mehr so simpel und elegant erkennbar, an welcher
Stelle die `RecordDeserializationException` aufgetreten ist
* Wenn man - wie hier beabsichtigt - die Konsumption beim Auftreten des
dieser Sorte Ausnahme-Fehler stoppen und die Offsets für alle _vor_ dem
Auftreten des Fehlers fehlerfrei empfangenen Nachrichten dafür speichern
will, dann ist dies ohne eine umständliche Offset-Verwaltung und vielen
`seek()`-Aufrufen nicht mehr möglich!
* Eine einfache Abhilfe scheint nicht möglich, da der
`KafkaMessageListenerContainer` nicht damit umgehen kann, wenn _in_ dem
`poll()` eine Exception geworfen wird und dann - wie beobachtet - in
einer Endlosschleife hängen bleibt.
Kai Moritz [Wed, 13 Apr 2022 21:09:49 +0000 (23:09 +0200)]
Springify: `CommonContainerStoppingErrorHandler` für erwartetes Verhalten
* Der `CommonContainerStoppingErrorHandler` stoppt den Container beim
ersten Auftreten eines Fehlers.
* Dadurch ist das erwartete Verhalten - soweit bisher durch die Tests
definiert - wiederhergestellt.
* Der Handler wird den Container aber auch bei einem Fehler im Listener
stoppen, so dass in dem Fall wahrscheinlich noch nachgebessert werden
muss.
Kai Moritz [Wed, 13 Apr 2022 20:18:21 +0000 (22:18 +0200)]
Springify: `ErrorHandlingDeserializer` bricht die Schleife
* Die Konfiguration eines `ErrorHandlingDeserializer` durchbricht die
Endlosschleife.
* Das Default-Verhalten von Spring Kafka ist aber dann in dem Fall, den
Fehler zu loggen und dann zu skippen.
* Dieses Verhalten wird von dem `DefaultErrorHandler` vorgegeben
Kai Moritz [Tue, 12 Apr 2022 22:38:24 +0000 (00:38 +0200)]
Springify: Kernfunktion von EndlessConsumer über Spring-Kafka
* Alle weiteren Funktionen für dieses erste Experiment erst mal entfernt
* Testfall entsprechend angepasst
* Der Commit passiert hier, weil Spring Kafka per Default den eignen
Commit-Modus `BATCH` aktiviert, der nach jedem abgearbeiteten `poll()`
einen (synchronen!) Commit durchführt
* Der Test ist zwar grün, wenn man die App normal startet, verfängt sie
sich jedoch in einer Endlosschleife, da kein Error-Handler konfiguriert
ist, der die `RecordDeserializationException` korrekt behandeln kann, so
dass sich die App in einem Life-Deadlock befindet, in dem sie immer
wieder den Datensatz erhält, der den Fehler ausgelöst hat, dadurch
aber nicht wie ein Vanilla-Consumer beendet wird, da Spring Kafka die
Exception abfängt und weitermacht: neuer `poll()` für die selbe Position,
so dass die App aus Sicht des GroupCoordinator noch lebt.
* DEBUG-Logging für `org.springframework.kafka` aktiviert, damit man die
Commits sieht.
Kai Moritz [Mon, 11 Apr 2022 14:19:44 +0000 (16:19 +0200)]
Tests: Geprüft, dass der Fehler nach einem Neustart neu vorliegt
Kai Moritz [Sun, 10 Apr 2022 12:50:50 +0000 (14:50 +0200)]
Tests: Umbau für einen Commit im Fehlerfall und Anpassung des Tests
Kai Moritz [Mon, 11 Apr 2022 08:04:53 +0000 (10:04 +0200)]
Tests: Refaktorisiert - Methoden-Reihenfolge aufgeräumt und kommentiert
Kai Moritz [Mon, 11 Apr 2022 07:41:40 +0000 (09:41 +0200)]
Tests: Der Test wartet, bis die Offsets regulär committed wurden
* Zuvor wurde der Offset-Commit erzwungen, indem der EndlessConsumer
über den Aufruf von `stop()` beendet wurde.
* Jetzt wird die Überprüfung der Erwartungen über awaitility aufgeschoben,
bis die Erwartungen beobachtet werden können - oder eine Zeitschranke
gerissen wird.
Kai Moritz [Sun, 10 Apr 2022 20:50:44 +0000 (22:50 +0200)]
Tests: Der EndlessConsumer wird jetzt doch asynchron ausgeführt
* Der Test-Code wird verständlicher, wenn der Consumer asynchron läuft
* Für die Überprüfung der Erwartungen wird dann awaitility benötigt
** Da der Consumer in einem separaten Thread läuft, muss auf diesen
gezielt gewartet werden
** Dafür wird es deutlicher/verständlicher, auf welche der auch aus dem
regulären Betrieb des EndlessConsumer bekannten Zustandsänderungen
der Test wartet
* Da die Offsets für die Controlle (ggf.) abgefragt werden, während der
EndlessConsumer noch läuft, wird ein separater KafkaConsumer benötigt,
um die Offsets abzurufen.
BEACHTE: Um die Änderungen in dem Offsets-Topic zu greifen zu bekommen,
muss dieser KafkaConsumer für jede Abfrage ein `assign()` durchführen,
damit er gezwungen ist, die Offsets neu vom Broker zu erfragen.
Kai Moritz [Sun, 10 Apr 2022 13:54:26 +0000 (15:54 +0200)]
Tests: Refaktorisiert - DRY für Test auf Offset-Fortschritt
Kai Moritz [Sun, 10 Apr 2022 13:25:58 +0000 (15:25 +0200)]
Tests: Überprüft, ob überhaupt ein Offset-Fortschritt vorliegt
Kai Moritz [Sun, 10 Apr 2022 13:25:57 +0000 (15:25 +0200)]
Tests: Offsets werden unter TopicPartition abgelegt
Kai Moritz [Sun, 10 Apr 2022 14:02:07 +0000 (16:02 +0200)]
Tests: Test-Reihenfolge definiert, da das Topic nicht geleert wird
Kai Moritz [Sat, 9 Apr 2022 14:14:47 +0000 (16:14 +0200)]
Tests: Erste Version eines synchronen Integration-Test implementiert
Kai Moritz [Sat, 9 Apr 2022 16:30:22 +0000 (18:30 +0200)]
Demonstration der RecordDeserializationException
* EndlessConsumer wird mit dem Value-Type `Long` anstatt `String` erzeugt
* Setup für die Demonstration der DeserializationException überarbeitet
Kai Moritz [Sat, 9 Apr 2022 14:04:02 +0000 (16:04 +0200)]
Refaktorisierung für Tests - EndlessConsumer typisiert
Kai Moritz [Sat, 9 Apr 2022 11:40:47 +0000 (13:40 +0200)]
Refaktorisierung für Tests - Record-Handler als Bean konfigurierbar
* Ein Handler für die Verarbeitung der einzelnen ConsumerRecord's kann
jetzt ein `java.util.function.Consumer` als Bean definiert werden
* Die Nachricht wird erst nach dem Aufruf des Handlers als konsumiert
gezählt, damit der Handler die Nachricht über eine Exception ablehnen
kann.
Kai Moritz [Mon, 11 Apr 2022 08:51:01 +0000 (10:51 +0200)]
Die Spring-Boot App wird nun sauber herunter gefahren
* Zuvor wurde die App nicht richtig beendet, weil der ExecutorService nicht
beendet wurde und sich die JVM deswegen nicht beenden konnte.
* Dies ist erst durch die Aktivierung des shutdown-Endpoints aufgefallen.
* Jetzt wurde in `Application` eine `@PreDestroy`-Methode ergänzt, die
den ExecutorService nach allen Regeln der Kunst ordentlich beendet.
Kai Moritz [Sat, 9 Apr 2022 11:21:43 +0000 (13:21 +0200)]
Refaktorisierung für Tests - ExecutorService als separate Bean erzeugt
Kai Moritz [Sat, 9 Apr 2022 09:36:29 +0000 (11:36 +0200)]
Refaktorisierung für Tests - Start des EndlessConsumer in ApplicationRunner
* Bisher wurde der EndlessConsumer beim erzeugen der Bean gestartet
* Dies ist für das Aufsetzen von Tests ungünstig, da die erzeugte Bean
dort nicht unbedingt direkt gestartet werden soll
* Daher wurde der Start des EndlessConsumer in einen ApplicationRunner
ausgelagert
Kai Moritz [Sat, 9 Apr 2022 09:46:51 +0000 (11:46 +0200)]
Refaktorisierung für Tests - Bean-Definition in Config-Klasse verschoben
Kai Moritz [Sat, 9 Apr 2022 09:21:43 +0000 (11:21 +0200)]
Refaktorisierung für Tests - KafkaConsumer als eigenständige Bean
* Der KafakConsumer wird als eigenständige Bean erzeugt
* Die Bean wird dem EndlessConsumer im Konstruktor übergeben
* Dafür muss der Lebenszyklus der KafkaConsumer-Bean von dem der
EndlessConsumer-Bean getrennt werden:
** close() darf nicht mehr im finally-Block im EndlessConsumer aufgerufen
werden
** Stattdessen muss close() als Destry-Methode der Bean definiert werden
** Für start/stop muss stattdessen unsubscribe() im finally-Block aufgerufen
werden
** Da unsubscribe() die Offset-Position nicht commited, muss explizit
ein Offsset-Commit beauftragt werden, wenn der Consumer regulär
gestoppt wird (WakeupException)
Kai Moritz [Sat, 9 Apr 2022 16:29:44 +0000 (18:29 +0200)]
Die Offsets werden ausgegeben, wenn eine Partition zugeteilt/entzogen wird
Kai Moritz [Sat, 9 Apr 2022 16:00:48 +0000 (18:00 +0200)]
Gesehene Schlüssel sollten als long gezählt werden
Kai Moritz [Sun, 10 Apr 2022 20:15:34 +0000 (22:15 +0200)]
Merge branch 'endless-stream-consumer' into rebalance-listener
Kai Moritz [Sun, 10 Apr 2022 19:56:37 +0000 (21:56 +0200)]
HealthIndicator implementiert
Kai Moritz [Sun, 10 Apr 2022 18:55:56 +0000 (20:55 +0200)]
shutdown-Endpoint aktiviert
Kai Moritz [Sun, 10 Apr 2022 18:55:42 +0000 (20:55 +0200)]
Informationen zur Kafka-Konfiguration im info-Endpoint sichtbar gemacht
Kai Moritz [Sun, 10 Apr 2022 18:55:13 +0000 (20:55 +0200)]
Informationen zur Java-Umgebung im info-Endpoint aktiviert
Kai Moritz [Sun, 10 Apr 2022 18:27:49 +0000 (20:27 +0200)]
Das Git-Commit-Id-Plugin generiert eine git.properties
Kai Moritz [Sun, 10 Apr 2022 18:25:28 +0000 (20:25 +0200)]
Das Spring-Boot-Maven-Plugin generiert Build-Info
Kai Moritz [Sun, 10 Apr 2022 17:52:09 +0000 (19:52 +0200)]
Default-Konfiguration überarbeitet
* Eine über die IDE bzw. Maven gestartete Instanz soll klar als solche
erkennbar sein (`client.id` = DEV), darf dafür aber im Compose-Setup
mitspielen (`group.id` = my-group).
* Bisher gab es häufig Port-Konflikte, wenn über die IDE bzw. über Maven
parallel zu einem Compose-Setup eine Instanz gestartet wurde. Daher wird
jetzt hier explizit auf einen abweichenden Port (8881) ausgewichen.
* Im Compose-Setup auch den neuen Port für den Producer übernommen
Kai Moritz [Sun, 10 Apr 2022 17:49:44 +0000 (19:49 +0200)]
Falschen Status-Code bei start/stop-Fehler korrigiert
Kai Moritz [Fri, 25 Mar 2022 14:20:00 +0000 (15:20 +0100)]
Verständliche Fehlermeldungen für DriverController
Kai Moritz [Sun, 10 Apr 2022 17:03:38 +0000 (19:03 +0200)]
Merge branch 'endless-stream-consumer' into rebalance-listener
Kai Moritz [Sun, 10 Apr 2022 16:58:16 +0000 (18:58 +0200)]
Thread-Synchronisation bei start/stop überarbeitet
Kai Moritz [Sat, 9 Apr 2022 16:50:43 +0000 (18:50 +0200)]
Fehlerbehandlung in EndlessConsumer.destroy() korrigiert
Kai Moritz [Sat, 9 Apr 2022 07:52:59 +0000 (09:52 +0200)]
Validierung erfolgt über spring-boot-starter-validation
Kai Moritz [Thu, 7 Apr 2022 07:25:41 +0000 (09:25 +0200)]
Rückbau auf verschachtelte Maps
* Die zuvor erfundenen fachlichen Klassen passen nicht zu dazu, dass man
sie - wie gedacht - direkt in MongoDB stopfen könnte.
* Daher hier erst mal der Rückbau auf Maps, da das dan für die Übungen
einfacher ist.
Kai Moritz [Wed, 6 Apr 2022 17:51:35 +0000 (19:51 +0200)]
counting-consumer ist eine Verbesserung von endless-consumer
Kai Moritz [Wed, 6 Apr 2022 06:45:57 +0000 (08:45 +0200)]
Benennung der neuen fachlichen Klassen verbessert
Kai Moritz [Wed, 6 Apr 2022 06:34:18 +0000 (08:34 +0200)]
Verschachtelte Map gegen fachliche Datenstruktur ausgetauscht
Kai Moritz [Tue, 5 Apr 2022 20:37:55 +0000 (22:37 +0200)]
Report über gesehene Schlüssel wiederbelebt
* An der alten Stelle war die Map ja jetzt nur noch leer, da dem Consumer
zu dem Zeitpunkt, an dem die gesehen Schlüssel ausgegeben wurden, bereits
alle Partitionen entzogen worden sind.
* Daher werden die gesehenen Schlüssel jetzt ausgegeben, wenn eine
Partition entzogen wird.
Kai Moritz [Tue, 5 Apr 2022 20:25:47 +0000 (22:25 +0200)]
Rebalance-Listener anstatt Wegwerfen der Map
Kai Moritz [Sun, 3 Apr 2022 10:19:47 +0000 (12:19 +0200)]
HTTPie gibt nicht nur den Response, sondern auch den Request aus
Kai Moritz [Sun, 3 Apr 2022 06:19:26 +0000 (08:19 +0200)]
Merge des Upgrades der Confluent-Images auf 7.0.2
Kai Moritz [Sun, 3 Apr 2022 06:18:08 +0000 (08:18 +0200)]
Merge des Upgrades der Confluent-Images auf 7.0.2
Kai Moritz [Sun, 3 Apr 2022 06:15:30 +0000 (08:15 +0200)]
Upgrade der Images von Confluent 6.2.0 auf 7.0.2
Kai Moritz [Sat, 2 Apr 2022 15:18:06 +0000 (17:18 +0200)]
Der Consumer erkennt die Änderung der Partitionierung schneller
* `metadata.max.age.ms` auf 1000 heruntergesetzt (Default 5 min)
* Dadurch wird die Änderung der Anzahl an Partitionen zeitnah erkannt
Kai Moritz [Sat, 2 Apr 2022 13:11:40 +0000 (15:11 +0200)]
Für null-Keys wird der String NULL gezählt
Kai Moritz [Sat, 2 Apr 2022 08:42:05 +0000 (10:42 +0200)]
README.sh zeigt die Auswirkung einer geänderten Partitions-Anzahl
Kai Moritz [Fri, 1 Apr 2022 20:02:28 +0000 (22:02 +0200)]
Der Consumer zählt jetzt die Nachrichten pro Key für jedes Topic
* Die Ergebnisse werden beim Beenden des Consumer ausgegeben
* Wenn der Consumer neu gestartet wird, werden die Ergebnisse zurückgesetzt
* Über /seen können Zwischenstände abgefragt werden
Kai Moritz [Fri, 1 Apr 2022 09:44:22 +0000 (11:44 +0200)]
`auto.offset.reset` konfigurierbar gemacht
Kai Moritz [Fri, 1 Apr 2022 09:40:14 +0000 (11:40 +0200)]
Fehler bei der Erzeugung des KafkaConsumer werden nicht mehr verschluckt
* Beim Erzeugen der Properties-Instanz können Exceptions fliegen
* Beim Erzeugen der KafkaConsumer-Instanz können Exception fliegen
* Daher wurden diese Schritte in den try/catch-Block verlegt
* Neben der Nachricht wird jetzt auch der ganze Stack-Trace gelogged
* Da die Erzeugung des KafkaConsumer jetzt im try/catch-Block geschieht,
wird der EndlessConsumer im Fehlerfall korrekt als beendet markiert
Kai Moritz [Thu, 31 Mar 2022 17:49:28 +0000 (19:49 +0200)]
Endless Consumer: a simple consumer, implemented as Spring-Boot-App
Kai Moritz [Fri, 1 Apr 2022 08:10:42 +0000 (10:10 +0200)]
SimpleConsumer in EndlessConsumer umbenannt
* Ohne die Umbenennung _vor_ der Veränderung würde die Versions-Historie
der Klasse verloren gehen!
Kai Moritz [Fri, 1 Apr 2022 09:56:34 +0000 (11:56 +0200)]
README.sh verwendet den cli-Service für Kommandos
Kai Moritz [Fri, 25 Mar 2022 14:27:50 +0000 (15:27 +0100)]
Fälschlich hartkodiertes Topic gegen Variable getauscht
Kai Moritz [Fri, 25 Mar 2022 10:19:03 +0000 (11:19 +0100)]
Code reorganisiert, um Änderungen vergleichbarer zu machen
Kai Moritz [Fri, 25 Mar 2022 08:25:54 +0000 (09:25 +0100)]
Upgrade für Spring Boot 2.6.0 -> 2.6.5
* Kafka: 3.0.1
* Spring Kafka: 2.8.4
Kai Moritz [Tue, 14 Dec 2021 17:55:02 +0000 (18:55 +0100)]
First Contact: Simple Producer & Consumer