Kai Moritz [Fri, 9 Sep 2022 09:46:25 +0000 (11:46 +0200)]
Backport von Verbesserungen / Erweiterungen der Tests:
* Integration-Test `ApplicationIT`, der prüft, ob die Spring-Boot
Anwendung ohne Fehler startet und dies über den Endpoint auch meldet.
* Inzwischen hinzugefügte `.editorconfig` übernommen.
* Fachspezifisches Interface `RecordHandler` statt `java.util.Consumer`.
* Kleinere Korrekturen / Verbesserungen an `GenericApplicationTests`
übernommen.
Kai Moritz [Thu, 18 Aug 2022 21:36:22 +0000 (23:36 +0200)]
GRÜN: Fehler in der Test-Logik korrigiert
* Die Assertion, dass nach einem wiederholten Versuch, den Logik-Fehler
zu konsumieren nicht mehr Nachrichten konsumiert wurden, als für den
Test generiert wurden ist nicht gültig, da bei einem Logik-Fehler ja
gerade _kein_ Commit der zuletzt gelesenen Nachrichten erfolgt, da
dies dazu führt, dass der Offset für Partitionen erhöht wird, für die
vor dem Eintreten des Fehlers noch nicht alle Nachrichten gelesen
wurden, wenn nicht explizti Seek's für diese Partitionen durchgeführt
werden.
* Die Assertion, dass die Offset-Position nach einem Fehler der Offset-
Position _vor_ der Ausführung der Fachlogik entspricht ist falsch, da
durchaus Commits durchgeführt werden können, bevor der Fehler auftritt.
Daher wird jetzt explizit geprüft, dass
** Die Offset-Position für keine Partition größer ist, als der Offset
der dort zuletzt gesehenen Nachricht.
** UND mindestens eine Partition existiert, deren Offset _kleiner_ ist,
als der Offset der zuletzt gesehenen Nachricht.
Kai Moritz [Fri, 19 Aug 2022 09:10:52 +0000 (11:10 +0200)]
ROT: Fehler in Test-Logik aufgedeckt
* Einige Assertions in dem Test für die Offset-Position nach einem
Logik-Fehler waren fehlerhaft.
* Dies ist bisher nicht aufgefallen, weil der Test nicht scharf genug
war: Er hat so wenig Nachrichten gesendet, dass die fehlerhaften
Assertions nicht aufgefallen sind, weil es nie zu einem Commit gekommen
ist, bevor der Fehler ausgelöst wurde.
* TODO: Der Test ist wahrscheinlich immer noch in hohem Maße abhängig
von der Ausführungsgeschwindigkeit auf dem Test-System. Besser wäre
es, wenn die Verarbeitung künstlich gedrosselt würde, so dass die
Timing-Annahmen zu den asynchron ablaufenden Operationen nicht auf
das Testsystem abgestimmt werden müssen.
Kai Moritz [Sun, 14 Aug 2022 13:35:28 +0000 (15:35 +0200)]
Signatur und Handling des `RecordGenerator` vereinfacht/überarbeitet
* Der `RecordGenerator` darf jetzt selbst bestimmen, wie viele Nachrichten
er erzeugt und wo wieviele Poison-Pills oder Logik-Fehler erzeugt
werden, wenn der Test dies anfordert.
* Dafür git der `RecordGenerator` jetzt die Anzahl der tatsächlich
erzeugten Nachrichten zurück, damit die Tests richtig reagieren können.
Kai Moritz [Sun, 14 Aug 2022 10:59:20 +0000 (12:59 +0200)]
`GenericApplicationTest` überspring Tests, wenn Fehler nicht verfügbar
* Über eine Annotation wird für Tests, die einen bestimmten Fehler-Typ
benötigen bei dem `RecordGenerator` nachgefragt, ob der Fehler-Typ
erzeugt werden kann.
* Wenn der Fehler-Typ nicht zur Verfügung steht, wird der Test
übersprungen.
Kai Moritz [Sun, 14 Aug 2022 10:06:01 +0000 (12:06 +0200)]
Tests aus gemerged springified-consumer--serialization -> deserialization
* Es wurde nur der hinzugefügte Test übernommen.
* Der hinzugefügte Test wurde an das von Spring-Kafka abweichende
Verhalten bei einem Logik-Fehler angepasst: Kafka führt nicht automatisch
Seeks oder einene Commit durch. Da `EndlessConsumer` bei einem
Logik-Fehler explizit ein `unsubscribe()` durchführt, wird kein
Offset-Commit durchgefürt, so dass die alten Offset-Positionen gültig
bleiben.
* Der Test wurde entsprechend umbenannt.
* `RecordGenerator` wurde um einen weiteren Integer-Set erweitert, über
den die Indizes der zu erzeugenden Logik-Fehler gesetzt werden können.
* Der hinzugefügte Test wurde auf die überarbeitete Methode zur Erzeugung
der Test-Nachrichten umgestellt.
* `ApplicationTest` wurde so ergänzt, dass der für den hinzugefügten Test
benötigte Logik-Fehler erzeugt wird.
Kai Moritz [Sun, 14 Aug 2022 09:32:10 +0000 (11:32 +0200)]
Typisierung in `GenericApplicationTest` nur noch, wo wirklich nötig
* Es wird nur noch dort mit Typisierung gearbeitet, wo dies unumgänglich
ist, weil die typisierte Implementierung angesprochen wird.
* Das Versenden der Test-Nachrichten erfolgt als `Bytes` für Schlüssel
und Nachricht.
* Dadurch muss der `RecordGenerator` nicht mehr typisiert werden.
* Dafür muss die typisierte Implementierung des Testfalls dann Schlüssel
und Nachricht mit einem passenden Serializer in eine `Bytes`-Payload
umwandeln.
Kai Moritz [Fri, 12 Aug 2022 20:31:24 +0000 (22:31 +0200)]
Compose-Setup und README.sh für dieses Beispiel repariert
* Zuvor war in dem Setup noch ein Producer konfiguriert, der Nachrichten
vom Typ `String` geschrieben hat, so dass der Consumer _sofort_ das
zeitliche gesegnet hat.
* Im README-Skript wurde nicht darauf gewartet, dass der Consumer
gemeldet hat, dass er ordentlich gestartet ist, bevor er nach der
vermeintlichen Konsumption der Poison-Pill wieder neu gestartet wurde.
Kai Moritz [Tue, 26 Jul 2022 14:03:10 +0000 (16:03 +0200)]
Verhalten des Testfalls kontrollierbarer gemacht
* Die Awaitility-Aufrufe pollen den zu prüfenden Zustand wenn nicht anders
angegeben so häufig, wie es die CPU zulässt - also ohne Verzögerung
zwischen den Überprüfungen.
* Das kann den Rechner temporär so überlasten, dass der erwartete Zustand
in der abgewarteten Zeit gar nicht eintritt!
* Z.B. aufgetreten, wenn wie hier das Commit-Interval auf 1 Sekunde
gesetzt ist, das Polling von Awaitility aber noch ungebremst durchgeführt
wird.
* Um diese Quelle für falsche Fehler auszuschließen, wurde jetzt
durchgängig ein Poll-Intervall von 1 Sekunde für Awaitility gesetzt.
Kai Moritz [Tue, 26 Jul 2022 13:37:43 +0000 (15:37 +0200)]
Testfall überarbeitet
* Abhängigkeit der Testergebnisse von Ausführreihenfolge beseitigt.
* Die Abhängigkeit bestand, da die Offset-Positionen als Zustand die
Testausführung überdauert haben.
* Daher konnte kein weiterer Test mehr ausgeführt werden, nachdem einmal
eine Poison-Pill in das Topic geschrieben wurde, über die der
implementierte Consumer stolpert.
* Um das zu umgehen, werden die Offset-Positionen jetzt nach jedem Test
auf das Ende der Partitionen verschoben. D.h., wenn in dem Test eine
Poision-Pill geschrieben wird, über die der implementierte Consumer
nicht hinweglesen kann, werden die Offests vor der Ausführung des
nächsten Tests über diese Poision-Pill hinweg gesetzt.
* Dadurch ist wurde ein Fehler / eine Schwäche in der Testlogik aufgedeckt:
In dem Test für das erfolgreiche Schreiben wurde nur deswegen ein Commit
ausgeführt, weil zuvor noch kein Commit durchgeführt wurde, so dass der
Default-Wert für das Commit-Interval immer überschritten war.
* Um das zu umgehen, wurde eine Konfigurations-Option für das Setzen des
Parameters `auto.commit.interval` eingeführt, so dass im Test
sichergestellt werden kann, dass auf jeden Fall in dem beobachteten
Zeitraum ein automatischer Commit ausgelöst wird.
* Außerdem: Weniger verwirrende Ausgabe des Offset-Fortschritts.
Kai Moritz [Sat, 23 Jul 2022 11:26:29 +0000 (13:26 +0200)]
Compose-Konfiguration unabhängig von Default-Konfiguration gemacht
* Damit Instanzen parallel über die IDE (mit voreingestelltem Default-Port)
und Compose gestartet werden können, wurden den einzelnen Komponenten
(Producer, Consumer etc.) jeweils unterschiedliche explizite
Default-Ports zugewiesen.
* Dies führt leicht zu fehlern, in den Compose-Setups, da dort i.d.R.
Port-Mappings für die gestarteten Instanzen definiert werden.
* Daher werden die Compose-Setups jetzt so umgestellt, dass sie den
einkompilierten Default-Port der Komponenten explizit mit dem Port `8080`
überschreiben, so dass alle Komponenten _innerhalb_ von Compose
einheitlich (und so wie bei Spring-Boot standard) über `8080` ansprechbar
sind.
Kai Moritz [Fri, 22 Jul 2022 18:04:07 +0000 (20:04 +0200)]
Upgrade von Spring Boot und den Confluent-Kafka-Images
* Upgrade der Kafk-Images von Confluent 7.0.2 auf 7.1.3
** Unterstützt Kafka 3.1.x (siehe https://docs.confluent.io/platform/current/installation/versions-interoperability.html[Versions-Matrix])
* Upgrade für Spring Boot von 2.6.5 auf 2.7.2
** Enthält Kafka: 3.1.1
** Enthält Spring Kafka: 2.8.8
Kai Moritz [Sun, 17 Apr 2022 11:33:40 +0000 (13:33 +0200)]
Springify: Der Payload ist eine als JSON gerenderte Klasse
* Als Nachricht wird eine Instanz der Klasse `ClientMessage` erwartet
* Die Instanz wird mit Hilfe des `JsonDeserializer` von Spring Kafka
deserialisiert.
Kai Moritz [Mon, 18 Apr 2022 10:41:47 +0000 (12:41 +0200)]
Tests: Refaktorisiert - Durcheinander bei Assertions aufgeräumt
* Zuvor wurden die Assertions von JUnit Jupiter und AssertJ durcheinander
gewürfelt verwendet.
* Jetzt werden stringent nur noch die Assertions von AssertJ verwendet.
Kai Moritz [Mon, 11 Apr 2022 07:41:40 +0000 (09:41 +0200)]
Tests: Der Test wartet, bis die Offsets regulär committed wurden
* Zuvor wurde der Offset-Commit erzwungen, indem der EndlessConsumer
über den Aufruf von `stop()` beendet wurde.
* Jetzt wird die Überprüfung der Erwartungen über awaitility aufgeschoben,
bis die Erwartungen beobachtet werden können - oder eine Zeitschranke
gerissen wird.
Kai Moritz [Sun, 10 Apr 2022 20:50:44 +0000 (22:50 +0200)]
Tests: Der EndlessConsumer wird jetzt doch asynchron ausgeführt
* Der Test-Code wird verständlicher, wenn der Consumer asynchron läuft
* Für die Überprüfung der Erwartungen wird dann awaitility benötigt
** Da der Consumer in einem separaten Thread läuft, muss auf diesen
gezielt gewartet werden
** Dafür wird es deutlicher/verständlicher, auf welche der auch aus dem
regulären Betrieb des EndlessConsumer bekannten Zustandsänderungen
der Test wartet
* Da die Offsets für die Controlle (ggf.) abgefragt werden, während der
EndlessConsumer noch läuft, wird ein separater KafkaConsumer benötigt,
um die Offsets abzurufen.
BEACHTE: Um die Änderungen in dem Offsets-Topic zu greifen zu bekommen,
muss dieser KafkaConsumer für jede Abfrage ein `assign()` durchführen,
damit er gezwungen ist, die Offsets neu vom Broker zu erfragen.
Kai Moritz [Sat, 9 Apr 2022 11:40:47 +0000 (13:40 +0200)]
Refaktorisierung für Tests - Record-Handler als Bean konfigurierbar
* Ein Handler für die Verarbeitung der einzelnen ConsumerRecord's kann
jetzt ein `java.util.function.Consumer` als Bean definiert werden
* Die Nachricht wird erst nach dem Aufruf des Handlers als konsumiert
gezählt, damit der Handler die Nachricht über eine Exception ablehnen
kann.
Kai Moritz [Mon, 11 Apr 2022 08:51:01 +0000 (10:51 +0200)]
Die Spring-Boot App wird nun sauber herunter gefahren
* Zuvor wurde die App nicht richtig beendet, weil der ExecutorService nicht
beendet wurde und sich die JVM deswegen nicht beenden konnte.
* Dies ist erst durch die Aktivierung des shutdown-Endpoints aufgefallen.
* Jetzt wurde in `Application` eine `@PreDestroy`-Methode ergänzt, die
den ExecutorService nach allen Regeln der Kunst ordentlich beendet.
Kai Moritz [Sat, 9 Apr 2022 09:36:29 +0000 (11:36 +0200)]
Refaktorisierung für Tests - Start des EndlessConsumer in ApplicationRunner
* Bisher wurde der EndlessConsumer beim erzeugen der Bean gestartet
* Dies ist für das Aufsetzen von Tests ungünstig, da die erzeugte Bean
dort nicht unbedingt direkt gestartet werden soll
* Daher wurde der Start des EndlessConsumer in einen ApplicationRunner
ausgelagert
Kai Moritz [Sat, 9 Apr 2022 09:21:43 +0000 (11:21 +0200)]
Refaktorisierung für Tests - KafkaConsumer als eigenständige Bean
* Der KafakConsumer wird als eigenständige Bean erzeugt
* Die Bean wird dem EndlessConsumer im Konstruktor übergeben
* Dafür muss der Lebenszyklus der KafkaConsumer-Bean von dem der
EndlessConsumer-Bean getrennt werden:
** close() darf nicht mehr im finally-Block im EndlessConsumer aufgerufen
werden
** Stattdessen muss close() als Destry-Methode der Bean definiert werden
** Für start/stop muss stattdessen unsubscribe() im finally-Block aufgerufen
werden
** Da unsubscribe() die Offset-Position nicht commited, muss explizit
ein Offsset-Commit beauftragt werden, wenn der Consumer regulär
gestoppt wird (WakeupException)
Kai Moritz [Sun, 10 Apr 2022 17:52:09 +0000 (19:52 +0200)]
Default-Konfiguration überarbeitet
* Eine über die IDE bzw. Maven gestartete Instanz soll klar als solche
erkennbar sein (`client.id` = DEV), darf dafür aber im Compose-Setup
mitspielen (`group.id` = my-group).
* Bisher gab es häufig Port-Konflikte, wenn über die IDE bzw. über Maven
parallel zu einem Compose-Setup eine Instanz gestartet wurde. Daher wird
jetzt hier explizit auf einen abweichenden Port (8881) ausgewichen.
* Im Compose-Setup auch den neuen Port für den Producer übernommen
Kai Moritz [Thu, 7 Apr 2022 07:25:41 +0000 (09:25 +0200)]
Rückbau auf verschachtelte Maps
* Die zuvor erfundenen fachlichen Klassen passen nicht zu dazu, dass man
sie - wie gedacht - direkt in MongoDB stopfen könnte.
* Daher hier erst mal der Rückbau auf Maps, da das dan für die Übungen
einfacher ist.
Kai Moritz [Tue, 5 Apr 2022 20:37:55 +0000 (22:37 +0200)]
Report über gesehene Schlüssel wiederbelebt
* An der alten Stelle war die Map ja jetzt nur noch leer, da dem Consumer
zu dem Zeitpunkt, an dem die gesehen Schlüssel ausgegeben wurden, bereits
alle Partitionen entzogen worden sind.
* Daher werden die gesehenen Schlüssel jetzt ausgegeben, wenn eine
Partition entzogen wird.
Kai Moritz [Fri, 1 Apr 2022 20:02:28 +0000 (22:02 +0200)]
Der Consumer zählt jetzt die Nachrichten pro Key für jedes Topic
* Die Ergebnisse werden beim Beenden des Consumer ausgegeben
* Wenn der Consumer neu gestartet wird, werden die Ergebnisse zurückgesetzt
* Über /seen können Zwischenstände abgefragt werden
Kai Moritz [Fri, 1 Apr 2022 09:40:14 +0000 (11:40 +0200)]
Fehler bei der Erzeugung des KafkaConsumer werden nicht mehr verschluckt
* Beim Erzeugen der Properties-Instanz können Exceptions fliegen
* Beim Erzeugen der KafkaConsumer-Instanz können Exception fliegen
* Daher wurden diese Schritte in den try/catch-Block verlegt
* Neben der Nachricht wird jetzt auch der ganze Stack-Trace gelogged
* Da die Erzeugung des KafkaConsumer jetzt im try/catch-Block geschieht,
wird der EndlessConsumer im Fehlerfall korrekt als beendet markiert