Kai Moritz [Sun, 14 Aug 2022 10:06:01 +0000 (12:06 +0200)]
Tests aus gemerged springified-consumer--serialization -> deserialization
* Es wurde nur der hinzugefügte Test übernommen.
* Der hinzugefügte Test wurde an das von Spring-Kafka abweichende
Verhalten bei einem Logik-Fehler angepasst: Kafka führt nicht automatisch
Seeks oder einene Commit durch. Da `EndlessConsumer` bei einem
Logik-Fehler explizit ein `unsubscribe()` durchführt, wird kein
Offset-Commit durchgefürt, so dass die alten Offset-Positionen gültig
bleiben.
* Der Test wurde entsprechend umbenannt.
* `RecordGenerator` wurde um einen weiteren Integer-Set erweitert, über
den die Indizes der zu erzeugenden Logik-Fehler gesetzt werden können.
* Der hinzugefügte Test wurde auf die überarbeitete Methode zur Erzeugung
der Test-Nachrichten umgestellt.
* `ApplicationTest` wurde so ergänzt, dass der für den hinzugefügten Test
benötigte Logik-Fehler erzeugt wird.
Kai Moritz [Sun, 14 Aug 2022 09:32:10 +0000 (11:32 +0200)]
Typisierung in `GenericApplicationTest` nur noch, wo wirklich nötig
* Es wird nur noch dort mit Typisierung gearbeitet, wo dies unumgänglich
ist, weil die typisierte Implementierung angesprochen wird.
* Das Versenden der Test-Nachrichten erfolgt als `Bytes` für Schlüssel
und Nachricht.
* Dadurch muss der `RecordGenerator` nicht mehr typisiert werden.
* Dafür muss die typisierte Implementierung des Testfalls dann Schlüssel
und Nachricht mit einem passenden Serializer in eine `Bytes`-Payload
umwandeln.
Kai Moritz [Fri, 12 Aug 2022 20:31:24 +0000 (22:31 +0200)]
Compose-Setup und README.sh für dieses Beispiel repariert
* Zuvor war in dem Setup noch ein Producer konfiguriert, der Nachrichten
vom Typ `String` geschrieben hat, so dass der Consumer _sofort_ das
zeitliche gesegnet hat.
* Im README-Skript wurde nicht darauf gewartet, dass der Consumer
gemeldet hat, dass er ordentlich gestartet ist, bevor er nach der
vermeintlichen Konsumption der Poison-Pill wieder neu gestartet wurde.
Kai Moritz [Tue, 26 Jul 2022 14:03:10 +0000 (16:03 +0200)]
Verhalten des Testfalls kontrollierbarer gemacht
* Die Awaitility-Aufrufe pollen den zu prüfenden Zustand wenn nicht anders
angegeben so häufig, wie es die CPU zulässt - also ohne Verzögerung
zwischen den Überprüfungen.
* Das kann den Rechner temporär so überlasten, dass der erwartete Zustand
in der abgewarteten Zeit gar nicht eintritt!
* Z.B. aufgetreten, wenn wie hier das Commit-Interval auf 1 Sekunde
gesetzt ist, das Polling von Awaitility aber noch ungebremst durchgeführt
wird.
* Um diese Quelle für falsche Fehler auszuschließen, wurde jetzt
durchgängig ein Poll-Intervall von 1 Sekunde für Awaitility gesetzt.
Kai Moritz [Tue, 26 Jul 2022 13:37:43 +0000 (15:37 +0200)]
Testfall überarbeitet
* Abhängigkeit der Testergebnisse von Ausführreihenfolge beseitigt.
* Die Abhängigkeit bestand, da die Offset-Positionen als Zustand die
Testausführung überdauert haben.
* Daher konnte kein weiterer Test mehr ausgeführt werden, nachdem einmal
eine Poison-Pill in das Topic geschrieben wurde, über die der
implementierte Consumer stolpert.
* Um das zu umgehen, werden die Offset-Positionen jetzt nach jedem Test
auf das Ende der Partitionen verschoben. D.h., wenn in dem Test eine
Poision-Pill geschrieben wird, über die der implementierte Consumer
nicht hinweglesen kann, werden die Offests vor der Ausführung des
nächsten Tests über diese Poision-Pill hinweg gesetzt.
* Dadurch ist wurde ein Fehler / eine Schwäche in der Testlogik aufgedeckt:
In dem Test für das erfolgreiche Schreiben wurde nur deswegen ein Commit
ausgeführt, weil zuvor noch kein Commit durchgeführt wurde, so dass der
Default-Wert für das Commit-Interval immer überschritten war.
* Um das zu umgehen, wurde eine Konfigurations-Option für das Setzen des
Parameters `auto.commit.interval` eingeführt, so dass im Test
sichergestellt werden kann, dass auf jeden Fall in dem beobachteten
Zeitraum ein automatischer Commit ausgelöst wird.
* Außerdem: Weniger verwirrende Ausgabe des Offset-Fortschritts.
Kai Moritz [Sat, 23 Jul 2022 11:26:29 +0000 (13:26 +0200)]
Compose-Konfiguration unabhängig von Default-Konfiguration gemacht
* Damit Instanzen parallel über die IDE (mit voreingestelltem Default-Port)
und Compose gestartet werden können, wurden den einzelnen Komponenten
(Producer, Consumer etc.) jeweils unterschiedliche explizite
Default-Ports zugewiesen.
* Dies führt leicht zu fehlern, in den Compose-Setups, da dort i.d.R.
Port-Mappings für die gestarteten Instanzen definiert werden.
* Daher werden die Compose-Setups jetzt so umgestellt, dass sie den
einkompilierten Default-Port der Komponenten explizit mit dem Port `8080`
überschreiben, so dass alle Komponenten _innerhalb_ von Compose
einheitlich (und so wie bei Spring-Boot standard) über `8080` ansprechbar
sind.
Kai Moritz [Fri, 22 Jul 2022 18:04:07 +0000 (20:04 +0200)]
Upgrade von Spring Boot und den Confluent-Kafka-Images
* Upgrade der Kafk-Images von Confluent 7.0.2 auf 7.1.3
** Unterstützt Kafka 3.1.x (siehe https://docs.confluent.io/platform/current/installation/versions-interoperability.html[Versions-Matrix])
* Upgrade für Spring Boot von 2.6.5 auf 2.7.2
** Enthält Kafka: 3.1.1
** Enthält Spring Kafka: 2.8.8
Kai Moritz [Sun, 17 Apr 2022 11:33:40 +0000 (13:33 +0200)]
Springify: Der Payload ist eine als JSON gerenderte Klasse
* Als Nachricht wird eine Instanz der Klasse `ClientMessage` erwartet
* Die Instanz wird mit Hilfe des `JsonDeserializer` von Spring Kafka
deserialisiert.
Kai Moritz [Mon, 18 Apr 2022 10:41:47 +0000 (12:41 +0200)]
Tests: Refaktorisiert - Durcheinander bei Assertions aufgeräumt
* Zuvor wurden die Assertions von JUnit Jupiter und AssertJ durcheinander
gewürfelt verwendet.
* Jetzt werden stringent nur noch die Assertions von AssertJ verwendet.
Kai Moritz [Mon, 11 Apr 2022 07:41:40 +0000 (09:41 +0200)]
Tests: Der Test wartet, bis die Offsets regulär committed wurden
* Zuvor wurde der Offset-Commit erzwungen, indem der EndlessConsumer
über den Aufruf von `stop()` beendet wurde.
* Jetzt wird die Überprüfung der Erwartungen über awaitility aufgeschoben,
bis die Erwartungen beobachtet werden können - oder eine Zeitschranke
gerissen wird.
Kai Moritz [Sun, 10 Apr 2022 20:50:44 +0000 (22:50 +0200)]
Tests: Der EndlessConsumer wird jetzt doch asynchron ausgeführt
* Der Test-Code wird verständlicher, wenn der Consumer asynchron läuft
* Für die Überprüfung der Erwartungen wird dann awaitility benötigt
** Da der Consumer in einem separaten Thread läuft, muss auf diesen
gezielt gewartet werden
** Dafür wird es deutlicher/verständlicher, auf welche der auch aus dem
regulären Betrieb des EndlessConsumer bekannten Zustandsänderungen
der Test wartet
* Da die Offsets für die Controlle (ggf.) abgefragt werden, während der
EndlessConsumer noch läuft, wird ein separater KafkaConsumer benötigt,
um die Offsets abzurufen.
BEACHTE: Um die Änderungen in dem Offsets-Topic zu greifen zu bekommen,
muss dieser KafkaConsumer für jede Abfrage ein `assign()` durchführen,
damit er gezwungen ist, die Offsets neu vom Broker zu erfragen.
Kai Moritz [Sat, 9 Apr 2022 11:40:47 +0000 (13:40 +0200)]
Refaktorisierung für Tests - Record-Handler als Bean konfigurierbar
* Ein Handler für die Verarbeitung der einzelnen ConsumerRecord's kann
jetzt ein `java.util.function.Consumer` als Bean definiert werden
* Die Nachricht wird erst nach dem Aufruf des Handlers als konsumiert
gezählt, damit der Handler die Nachricht über eine Exception ablehnen
kann.
Kai Moritz [Mon, 11 Apr 2022 08:51:01 +0000 (10:51 +0200)]
Die Spring-Boot App wird nun sauber herunter gefahren
* Zuvor wurde die App nicht richtig beendet, weil der ExecutorService nicht
beendet wurde und sich die JVM deswegen nicht beenden konnte.
* Dies ist erst durch die Aktivierung des shutdown-Endpoints aufgefallen.
* Jetzt wurde in `Application` eine `@PreDestroy`-Methode ergänzt, die
den ExecutorService nach allen Regeln der Kunst ordentlich beendet.
Kai Moritz [Sat, 9 Apr 2022 09:36:29 +0000 (11:36 +0200)]
Refaktorisierung für Tests - Start des EndlessConsumer in ApplicationRunner
* Bisher wurde der EndlessConsumer beim erzeugen der Bean gestartet
* Dies ist für das Aufsetzen von Tests ungünstig, da die erzeugte Bean
dort nicht unbedingt direkt gestartet werden soll
* Daher wurde der Start des EndlessConsumer in einen ApplicationRunner
ausgelagert
Kai Moritz [Sat, 9 Apr 2022 09:21:43 +0000 (11:21 +0200)]
Refaktorisierung für Tests - KafkaConsumer als eigenständige Bean
* Der KafakConsumer wird als eigenständige Bean erzeugt
* Die Bean wird dem EndlessConsumer im Konstruktor übergeben
* Dafür muss der Lebenszyklus der KafkaConsumer-Bean von dem der
EndlessConsumer-Bean getrennt werden:
** close() darf nicht mehr im finally-Block im EndlessConsumer aufgerufen
werden
** Stattdessen muss close() als Destry-Methode der Bean definiert werden
** Für start/stop muss stattdessen unsubscribe() im finally-Block aufgerufen
werden
** Da unsubscribe() die Offset-Position nicht commited, muss explizit
ein Offsset-Commit beauftragt werden, wenn der Consumer regulär
gestoppt wird (WakeupException)
Kai Moritz [Sun, 10 Apr 2022 17:52:09 +0000 (19:52 +0200)]
Default-Konfiguration überarbeitet
* Eine über die IDE bzw. Maven gestartete Instanz soll klar als solche
erkennbar sein (`client.id` = DEV), darf dafür aber im Compose-Setup
mitspielen (`group.id` = my-group).
* Bisher gab es häufig Port-Konflikte, wenn über die IDE bzw. über Maven
parallel zu einem Compose-Setup eine Instanz gestartet wurde. Daher wird
jetzt hier explizit auf einen abweichenden Port (8881) ausgewichen.
* Im Compose-Setup auch den neuen Port für den Producer übernommen
Kai Moritz [Thu, 7 Apr 2022 07:25:41 +0000 (09:25 +0200)]
Rückbau auf verschachtelte Maps
* Die zuvor erfundenen fachlichen Klassen passen nicht zu dazu, dass man
sie - wie gedacht - direkt in MongoDB stopfen könnte.
* Daher hier erst mal der Rückbau auf Maps, da das dan für die Übungen
einfacher ist.
Kai Moritz [Tue, 5 Apr 2022 20:37:55 +0000 (22:37 +0200)]
Report über gesehene Schlüssel wiederbelebt
* An der alten Stelle war die Map ja jetzt nur noch leer, da dem Consumer
zu dem Zeitpunkt, an dem die gesehen Schlüssel ausgegeben wurden, bereits
alle Partitionen entzogen worden sind.
* Daher werden die gesehenen Schlüssel jetzt ausgegeben, wenn eine
Partition entzogen wird.
Kai Moritz [Fri, 1 Apr 2022 20:02:28 +0000 (22:02 +0200)]
Der Consumer zählt jetzt die Nachrichten pro Key für jedes Topic
* Die Ergebnisse werden beim Beenden des Consumer ausgegeben
* Wenn der Consumer neu gestartet wird, werden die Ergebnisse zurückgesetzt
* Über /seen können Zwischenstände abgefragt werden
Kai Moritz [Fri, 1 Apr 2022 09:40:14 +0000 (11:40 +0200)]
Fehler bei der Erzeugung des KafkaConsumer werden nicht mehr verschluckt
* Beim Erzeugen der Properties-Instanz können Exceptions fliegen
* Beim Erzeugen der KafkaConsumer-Instanz können Exception fliegen
* Daher wurden diese Schritte in den try/catch-Block verlegt
* Neben der Nachricht wird jetzt auch der ganze Stack-Trace gelogged
* Da die Erzeugung des KafkaConsumer jetzt im try/catch-Block geschieht,
wird der EndlessConsumer im Fehlerfall korrekt als beendet markiert