demos/kafka/training
21 months agoMerge der überarbeiteten Compose-Konfiguration ('deserialization') springify-experiments
Kai Moritz [Sat, 23 Jul 2022 14:25:02 +0000 (16:25 +0200)]
Merge der überarbeiteten Compose-Konfiguration ('deserialization')

21 months agoMerge der überarbeiteten Compose-Konfiguration ('rebalance-listener')
Kai Moritz [Sat, 23 Jul 2022 14:15:29 +0000 (16:15 +0200)]
Merge der überarbeiteten Compose-Konfiguration ('rebalance-listener')

21 months agoMerge der überarbeiteten Compose-Konfiguration ('counting-consumer')
Kai Moritz [Sat, 23 Jul 2022 13:35:01 +0000 (15:35 +0200)]
Merge der überarbeiteten Compose-Konfiguration ('counting-consumer')

21 months agoMerge der überarbeiteten Compose-Konfiguration ('endless-stream-consumer')
Kai Moritz [Sat, 23 Jul 2022 11:49:23 +0000 (13:49 +0200)]
Merge der überarbeiteten Compose-Konfiguration ('endless-stream-consumer')

* Die letzten Änderungen an 'endless-stream-consumer' sind länger nicht
  mehr gemerged worden.

21 months agoCompose-Konfiguration unabhängig von Default-Konfiguration gemacht
Kai Moritz [Sat, 23 Jul 2022 11:26:29 +0000 (13:26 +0200)]
Compose-Konfiguration unabhängig von Default-Konfiguration gemacht

* Damit Instanzen parallel über die IDE (mit voreingestelltem Default-Port)
  und Compose gestartet werden können, wurden den einzelnen Komponenten
  (Producer, Consumer etc.) jeweils unterschiedliche explizite
  Default-Ports zugewiesen.
* Dies führt leicht zu fehlern, in den Compose-Setups, da dort i.d.R.
  Port-Mappings für die gestarteten Instanzen definiert werden.
* Daher werden die Compose-Setups jetzt so umgestellt, dass sie den
  einkompilierten Default-Port der Komponenten explizit mit dem Port `8080`
  überschreiben, so dass alle Komponenten _innerhalb_ von Compose
  einheitlich (und so wie bei Spring-Boot standard) über `8080` ansprechbar
  sind.

21 months agoMerge der Upgrades für Confluent/Spring-Boot (Branch 'first-contact')
Kai Moritz [Sat, 23 Jul 2022 08:37:24 +0000 (10:37 +0200)]
Merge der Upgrades für Confluent/Spring-Boot (Branch 'first-contact')

21 months agoUpgrade von Spring Boot und den Confluent-Kafka-Images
Kai Moritz [Fri, 22 Jul 2022 18:04:07 +0000 (20:04 +0200)]
Upgrade von Spring Boot und den Confluent-Kafka-Images

* Upgrade der Kafk-Images von Confluent 7.0.2 auf 7.1.3
** Unterstützt Kafka 3.1.x (siehe https://docs.confluent.io/platform/current/installation/versions-interoperability.html[Versions-Matrix])
* Upgrade für Spring Boot von 2.6.5 auf 2.7.2
** Enthält Kafka: 3.1.1
** Enthält Spring Kafka: 2.8.8

23 months agoSpringify: Setup und README-Skript angepasst
Kai Moritz [Tue, 31 May 2022 03:27:57 +0000 (05:27 +0200)]
Springify: Setup und README-Skript angepasst

* Das Docker-Setup startet die Versionen von Producer und Consumer, die
  verschiedene Nachrichten über das Topic senden.
* Dabei auch einen neuen Artefakt-Namen für die Variante festgelegt.
* Das Skript erzeugt für jeden vom Producer erzeugten Nachrichten-Typ eine
  Nachricht.
* Außerdem erzeugt es auch weiterhin die Poison-Pill, die den Consumer
  jetzt aber nicht mehr flachlegt, sondern in der Dead-Letter-Queue landet.
* Daher gibt das Skript am Ende auch den Inhalt der Dead-Letter-Queue aus.

23 months agoSpringify: Test für das lesen gemischter Nachrichten mit aller Fehler-Typen
Kai Moritz [Sun, 5 Jun 2022 05:14:54 +0000 (07:14 +0200)]
Springify: Test für das lesen gemischter Nachrichten mit aller Fehler-Typen

23 months agoSpringify: Einen Test für das Empfangen gemischter Nachrichten hinzugefügt
Kai Moritz [Sun, 5 Jun 2022 04:29:51 +0000 (06:29 +0200)]
Springify: Einen Test für das Empfangen gemischter Nachrichten hinzugefügt

23 months agoSpringify: Testfall repariert - Seltsames Verhalten von `@KafkaHandler`!
Kai Moritz [Sat, 4 Jun 2022 09:41:19 +0000 (11:41 +0200)]
Springify: Testfall repariert - Seltsames Verhalten von `@KafkaHandler`!

* Um den Testfall zu reparieren, musste die Implementierung so angepasst
  werden, dass die Handler wie zuvor die `RecordMetadata` von Kafka
  erhalten.
* Dafür nehmen die `receive`-Methoden von `EndlessConsumer` und die
  Handler, die von diesen aufgerufen werden jetzt zwei Parameter entgegen:
  * Die deserialisierte Nachricht (`ClientMessage` oder `Greeting`)
  * Die `ConsumerRecordMetaData`
* Wenn die Methoden den zweiten Parameter annehmen, werden die Nachrichten
  von Spring Kafka den Handler-Methoden nicht mehr korrekt übergeben: Es
  kommt zu einem Fehler wegen einem angeblich fehlendem Resolver, der nur
  umschifft werden kann, indem eine weitere Default-Handler-Methode
  hinzugefügt wird.
  Da dies nicht nötig ist, wenn nur die Nachricht übergeben wird, richt
  das ganze sehr nach einem Bug...

23 months agoSpringify: Der Consumer kann unterschiedliche Nachrichten-Typen empfangen
Kai Moritz [Sat, 28 May 2022 16:27:45 +0000 (18:27 +0200)]
Springify: Der Consumer kann unterschiedliche Nachrichten-Typen empfangen

* Neben `ClientMessage` gibt es jetzt auch noch den Nachrichten-Typ
  `Greeting`.
* `ClientMessage` ist auf den Bezeichner `message` gemapped.
* `Greeting` ist auf den Bezeichner `greeting` gemapped.
* Für die beiden bekannten Nachrichten-Typen sind via `@KafkaHandler`
  separate Handler konfiguriert.
* Unbekannte Nachrichten (z.B. `FooMessage`) landen zusammen mit anderen
  Fehlern in dem DLQ-Topic.

23 months agoDie IntelliJ-IDE will nicht ohne explizite Compiler-Version
Kai Moritz [Wed, 11 May 2022 16:09:46 +0000 (18:09 +0200)]
Die IntelliJ-IDE will nicht ohne explizite Compiler-Version

23 months agoSporadische Timing-Fehler in dem Test ausgeschlossen
Kai Moritz [Wed, 11 May 2022 16:25:36 +0000 (18:25 +0200)]
Sporadische Timing-Fehler in dem Test ausgeschlossen

* Ohne die Verzögerung des Pollings von Awaitility schlug der neue
  Test manchmal fehl, wenn der letzte Fehler, der auf die DLQ verschoben
  werden musste, genau auf dem letzten verarbeiteten Datensatz aufgetreten
  ist.
* Der Timeing-Fehler kann gezielt ausgelöst werden, wenn das Poll-Intervall
  quasi auf Null (1 Nanosekunde) gesetzt wird.
* Das zurückhaltendere Polling führt dazu, dass der Timing-Fehler im Test
  nicht mehr auftritt.
* _Schluss:_ Das Tooling war durch das penetrannte Polling von Awaitility
  einfach so überlastet, dass der Offset-Commit nicht schnell genug
  angekommen ist.

23 months agoSpringify: Gemeinsame DLQ für Poison Pills und Fachlogik-Fehler konfiguriert
Kai Moritz [Wed, 11 May 2022 17:23:40 +0000 (19:23 +0200)]
Springify: Gemeinsame DLQ für Poison Pills und Fachlogik-Fehler konfiguriert

23 months agoEindeutigere Log-Meldungen für den Test
Kai Moritz [Wed, 11 May 2022 16:00:43 +0000 (18:00 +0200)]
Eindeutigere Log-Meldungen für den Test

23 months agoSpringify: DLQ für Poison Pills konfiguriert
Kai Moritz [Sat, 23 Apr 2022 08:11:17 +0000 (10:11 +0200)]
Springify: DLQ für Poison Pills konfiguriert

* Den Producer so konfiguriert, dass er einenen `ByteArraySerializer`
  verwendet.
* Dafür wird der `DefaultErrorHandler` explizit konfiguriert und mit
  einem `DeadLetterPublishingRecoverer` konfiguriert.
* Der `DefaultErrorHandler` ist mit einer `FixedBackOff`-Strategie
  konfiguriert, die die Nachricht direkt nach dem ersten Fehler auf
  das DLQ-Topic umleitet.
* Damit der Producer die als `byte[]` übergebene fehlerhafte Nachricht
  serialisieren kann, wurde er mit einem `ByteArraySerializer`
  konfiguriert.
* *TODO:* Auch Exceptions, die in dem `MessageListener` also dem
  `EndlessConsumer` geworfen werden, werden von dem im
  `DefaultErrorHandler` konfigurierten `DeadLetterPublishingRecoverer`
  in die DLQ geschickt, Problem dabei:
** Der Producer kann diese so wie hier konfiguriert noch nicht
   serialisieren.
** Für diese Fehler wäre aber auch eh eigentlich das Stop-World-Verhalten
   zu bevorzugen.

2 years agoFehler in Shutdown-Logik für den `ExecutorService` korrigiert
Kai Moritz [Sat, 30 Apr 2022 09:36:13 +0000 (11:36 +0200)]
Fehler in Shutdown-Logik für den `ExecutorService` korrigiert

2 years agoMerge branch 'rebalance-listener' into deserialization
Kai Moritz [Sat, 30 Apr 2022 09:34:38 +0000 (11:34 +0200)]
Merge branch 'rebalance-listener' into deserialization

2 years agoMerge branch 'endless-stream-consumer' into rebalance-listener
Kai Moritz [Sat, 30 Apr 2022 09:24:34 +0000 (11:24 +0200)]
Merge branch 'endless-stream-consumer' into rebalance-listener

2 years agoHealthIndicator implementiert: Implementierung selbst vergessen :/
Kai Moritz [Sat, 30 Apr 2022 09:22:41 +0000 (11:22 +0200)]
HealthIndicator implementiert: Implementierung selbst vergessen :/

2 years agoSpringifiy: Merge und Wiederbelebung des Rebalance-Listeners zur Zählung
Kai Moritz [Fri, 22 Apr 2022 15:59:18 +0000 (17:59 +0200)]
Springifiy: Merge und Wiederbelebung des Rebalance-Listeners zur Zählung

2 years agoRefactor: `EndlessConsumer` ist selbst ein `ConsumerRebalanceListener`
Kai Moritz [Fri, 22 Apr 2022 15:53:41 +0000 (17:53 +0200)]
Refactor: `EndlessConsumer` ist selbst ein `ConsumerRebalanceListener`

* Vorbereitung für die Übernahme der Funktionalität in der springifizierten
  Version.

2 years agoSpringify: Merge der Umstellung auf die Auto-Konfiguration von Spring-Boot
Kai Moritz [Fri, 22 Apr 2022 15:15:36 +0000 (17:15 +0200)]
Springify: Merge der Umstellung auf die Auto-Konfiguration von Spring-Boot

2 years agoSpringify: Der Kafka-`Consumer` wird über die Spring-Factory erzeugt
Kai Moritz [Fri, 22 Apr 2022 09:24:55 +0000 (11:24 +0200)]
Springify: Der Kafka-`Consumer` wird über die Spring-Factory erzeugt

2 years agoSpringify: Merge der Umstellung des Payloads auf JSON
Kai Moritz [Mon, 18 Apr 2022 11:47:40 +0000 (13:47 +0200)]
Springify: Merge der Umstellung des Payloads auf JSON

2 years agoSpringify: Konfiguration erfolgt über `KafkaProperties`
Kai Moritz [Fri, 22 Apr 2022 09:08:37 +0000 (11:08 +0200)]
Springify: Konfiguration erfolgt über `KafkaProperties`

2 years agoSpringify: Nachrichten-Typ wird über den Type-Info-Header bestimmt
Kai Moritz [Sun, 17 Apr 2022 11:44:49 +0000 (13:44 +0200)]
Springify: Nachrichten-Typ wird über den Type-Info-Header bestimmt

2 years agoSpringify: Der Payload ist eine als JSON gerenderte Klasse
Kai Moritz [Sun, 17 Apr 2022 11:33:40 +0000 (13:33 +0200)]
Springify: Der Payload ist eine als JSON gerenderte Klasse

* Als Nachricht wird eine Instanz der Klasse `ClientMessage` erwartet
* Die Instanz wird mit Hilfe des `JsonDeserializer` von Spring Kafka
  deserialisiert.

2 years agoSpringify: Refactor - Aufruf der Assertions vereinfacht
Kai Moritz [Mon, 18 Apr 2022 11:37:33 +0000 (13:37 +0200)]
Springify: Refactor - Aufruf der Assertions vereinfacht

* Warum auch immer: Der Compiler erkennt den Typ des Generics in der
  springifizierten Version nur dann ohne erzwungenen Cast korrekt, wenn
  der `describeAs()`-Aufruf als letzes erfolgt.
* In der nicht springifizierten Version, ist die Aufrufreihenfolge egal:
  Der Compiler erkennt den Typ des Generics unabhängig davon korrekt.
* An ggf. transitiv angezogenen Abhängigkeiten liegt es laut
  `mvn help:effective-pom` nicht. Dies zeigt für beide Versionen (abgesehen
  von dem explizit ergänzten `spring-kafka`) exakt die selben
  Abhängigkeiten.

2 years agoSpringify: Merge des verschärften Tests aus der Vanilla-Version
Kai Moritz [Mon, 18 Apr 2022 10:46:46 +0000 (12:46 +0200)]
Springify: Merge des verschärften Tests aus der Vanilla-Version

* Logik zur Abfrage der Exception wiederbelebt, an der ein über eine
  Poison Pill gestolperter `KafkaConsumer` gestorben ist, damit die
  springifizierte Version den verschärften Test bestehen kann.
* Um an die Exception zu gelangen, musste eine angepasste
  Version des `CommonContainerStoppingExceptionHandler` implementiert
  werden, die sich die Exception, über die der `KafkaConsumer` gestolpert
  ist, merkt.
* Dabei auch den Health-Endpoint wiederbelebt.
* Seltsamer Weise musste dabei der Code für die AssertJ-Assertions
  angepasst werden, obwohl sich die Logik im Testfall und die Signatur der
  getesteten Methode nicht geändert hat. Vielleicht durch eine Änderung in
  den transitiv angezogenen Abhängigkeiten durch das Einbinden von
  Spring Kafka??

2 years agoTests: Refaktorisiert - Durcheinander bei Assertions aufgeräumt
Kai Moritz [Mon, 18 Apr 2022 10:41:47 +0000 (12:41 +0200)]
Tests: Refaktorisiert - Durcheinander bei Assertions aufgeräumt

* Zuvor wurden die Assertions von JUnit Jupiter und AssertJ durcheinander
  gewürfelt verwendet.
* Jetzt werden stringent nur noch die Assertions von AssertJ verwendet.

2 years agoTests: Refaktorisiert - Serialisierung des Payloads konfigurierbar gemacht
Kai Moritz [Sun, 17 Apr 2022 11:15:07 +0000 (13:15 +0200)]
Tests: Refaktorisiert - Serialisierung des Payloads konfigurierbar gemacht

2 years agoSpringify: Die Fehlerbehandlung funktioniert unabhängig vom Batch-Listener
Kai Moritz [Sat, 16 Apr 2022 10:03:19 +0000 (12:03 +0200)]
Springify: Die Fehlerbehandlung funktioniert unabhängig vom Batch-Listener

* Es ist nicht wie zunächst vermutet nötig, einen Batch-Listener zu
  verwenden, um das gewünschte Stop-World-Verhalten für Deserialisierungs-
  Fehler zu erreichen.
* Den Batch-Listener wieder entfernt - der Test bleibt unverändert GRÜN.
* Der von Spring Kafka automatisch erzeugte Listener-Container verwendet
  dann auch automatisch den als Bean definierten Error-Handler.

2 years agoSpringify: GRÜN - Unerwartetes Verhalten lag an Konfigurations-Fehler
Kai Moritz [Fri, 15 Apr 2022 10:55:30 +0000 (12:55 +0200)]
Springify: GRÜN - Unerwartetes Verhalten lag an Konfigurations-Fehler

* Der `KafkaMessageListenerContainer` hatte sich nicht wegen einem
  fehlenden `ErrorHandlingDeserializer` trotz des konfigurierten
  `CommonContainerStoppingErrorHandler` in der Endlosschleife verfangen,
  so wie es die Fehlermeldung suggeriert hatte.
* Der Fehler lag eigentlich daran, dass der Error-Handler zwar erzeugt,
  aber nicht der wegen der Batch-Verarbeitung manuell erzeugten
  `KafkaListenerContainerFactory` übergeben wurde.

2 years agoSpringify: ROT - Merge des verschärften Tests aus der Vanilla-Version
Kai Moritz [Fri, 15 Apr 2022 12:01:07 +0000 (14:01 +0200)]
Springify: ROT - Merge des verschärften Tests aus der Vanilla-Version

* Ohne die Verschärfung des Tests war der Test grün, obwohl die Anwendung
  entgegen den Erwartungen trotz der eingetreuten Poison-Pill _alle_
  Nachrichten gelesen hat.
* Jetzt schlägt der Test wie erwartet fehl.

2 years agoSpringify: GRÜN - `start()`/`stop()` werden im Test explizit aufgerufen
Kai Moritz [Fri, 15 Apr 2022 09:51:08 +0000 (11:51 +0200)]
Springify: GRÜN - `start()`/`stop()` werden im Test explizit aufgerufen

2 years agoSpringify: Die `@PreDestroy`-Methode wird nicht benötigt
Kai Moritz [Fri, 15 Apr 2022 09:30:56 +0000 (11:30 +0200)]
Springify: Die `@PreDestroy`-Methode wird nicht benötigt

* Spring Kafka prüft schon zuvor selbst, ob der Container noch läuft und
  fährt ihn ggf. herunter.

2 years agoSpringify: Start/Stop prüft, ob der Container schon/noch läuft
Kai Moritz [Fri, 15 Apr 2022 09:29:48 +0000 (11:29 +0200)]
Springify: Start/Stop prüft, ob der Container schon/noch läuft

2 years agoSpringify: `start()`/`stop()`/`destroy()` in EndlessConsumer wiederbelebt
Kai Moritz [Fri, 15 Apr 2022 08:56:40 +0000 (10:56 +0200)]
Springify: `start()`/`stop()`/`destroy()` in EndlessConsumer wiederbelebt

* Die Logik für Start/Stop liegt jetzt wieder wie vor der Umstellung in
  dem EndlessConsumer
* Der `ApplicationRunner` kenn den `EndlessConsumer` und ruft für diese
  `start()` auf
* Entsprechende Aufrufe im `DriverController` wiederbelebt.
* Das Stoppen beim Herunterfahren der App wird wieder über eine
  `@PreDestroy`-Methode im `EndlessConsumer` realisiert.

2 years agoSpringify: ROT - Auto Startup in @KafkaListener deaktiviert
Kai Moritz [Fri, 15 Apr 2022 08:39:17 +0000 (10:39 +0200)]
Springify: ROT - Auto Startup in @KafkaListener deaktiviert

* Ziel, näher an die Funktion des Vanilla-EndlessConsumer heranrücken.
  Der Testfall soll möglichst unverändert funktionieren.
* Unklar, ob das für die Schulung hilfreich ist.
* Hilft aber definitiv beim Verstehen der Mechanismen von Spring Kafka
* Hier wurde jetzt erst mal der automatische Start des Containers
  unterbunden und stattdessen `Application` wieder zu einem
  `ApplicationRunner` mit `@PreDestroy`-Methode gemacht.
* TODO: Testfall ist darüber erst mal ROT, da der Container dort jetzt
  nicht gestartet wird! Das war ja aber genau das Ziel, denn bei den
  übernommenen Testfällen gab es (zumindest theoretisch) Race-Conditions,
  weil der Container - anders als bei der Vanilla-Implementierung -  immer
  schon lief.

2 years agoTests: Tests prüfen den Status des Consumers
Kai Moritz [Fri, 15 Apr 2022 11:14:19 +0000 (13:14 +0200)]
Tests: Tests prüfen den Status des Consumers

2 years agoTests: Refaktorisiert - Nachrichten werden für alle Tests aufgezeichnet
Kai Moritz [Fri, 15 Apr 2022 10:22:57 +0000 (12:22 +0200)]
Tests: Refaktorisiert - Nachrichten werden für alle Tests aufgezeichnet

2 years agoTests: Fehlerfall-Test prüft, dass nicht alle Nachrichten gelesen wurden
Kai Moritz [Fri, 15 Apr 2022 10:17:23 +0000 (12:17 +0200)]
Tests: Fehlerfall-Test prüft, dass nicht alle Nachrichten gelesen wurden

2 years agoTests: Assert-Beschreibungen korrigiert/ergänzt
Kai Moritz [Fri, 15 Apr 2022 10:08:50 +0000 (12:08 +0200)]
Tests: Assert-Beschreibungen korrigiert/ergänzt

2 years agoSpringify: BatchListener konfiguriert - Hilft nicht wirklich...
Kai Moritz [Thu, 14 Apr 2022 16:56:04 +0000 (18:56 +0200)]
Springify: BatchListener konfiguriert - Hilft nicht wirklich...

* Unerwartete aber eigentlich logische Folge:
** Es werden weiterhing alle 100 Nachrichten abgeholt
** Der Testfall schlägt trotzdem nicht fehl, da die Erwartung an die
   gespeicherten Offsets dynamisch aus den tatsächlich vom Listener
   bezogenen Nachrichten bestimmt wird.
* Der Effekt tritt ein, weil der `ErrorHandlingDeserializer` ja die
  verursachende Exception schluckt und dafür eine Behandlung des Fehlers
  vornimmt, die den Fehler für die nachfolgenden Komponenten transparenter
  machen soll.
* Es ist aber eher das Gegenteil der Fall, da die Folge ist, dass der
  `poll()`-Aufruf alle Nachrichten - _auch die nach dem Fehler!_ - abholt.
* D.h., es ist nicht mehr so simpel und elegant erkennbar, an welcher
  Stelle die `RecordDeserializationException` aufgetreten ist
* Wenn man - wie hier beabsichtigt - die Konsumption beim Auftreten des
  dieser Sorte Ausnahme-Fehler stoppen und die Offsets für alle _vor_ dem
  Auftreten des Fehlers fehlerfrei empfangenen Nachrichten dafür speichern
  will, dann ist dies ohne eine umständliche Offset-Verwaltung und vielen
  `seek()`-Aufrufen nicht mehr möglich!
* Eine einfache Abhilfe scheint nicht möglich, da der
  `KafkaMessageListenerContainer` nicht damit umgehen kann, wenn _in_ dem
  `poll()` eine Exception geworfen wird und dann - wie beobachtet - in
  einer Endlosschleife hängen bleibt.

2 years agoSpringify: `CommonContainerStoppingErrorHandler` für erwartetes Verhalten
Kai Moritz [Wed, 13 Apr 2022 21:09:49 +0000 (23:09 +0200)]
Springify: `CommonContainerStoppingErrorHandler` für erwartetes Verhalten

* Der `CommonContainerStoppingErrorHandler` stoppt den Container beim
  ersten Auftreten eines Fehlers.
* Dadurch ist das erwartete Verhalten - soweit bisher durch die Tests
  definiert - wiederhergestellt.
* Der Handler wird den Container aber auch bei einem Fehler im Listener
  stoppen, so dass in dem Fall wahrscheinlich noch nachgebessert werden
  muss.

2 years agoSpringify: `ErrorHandlingDeserializer` bricht die Schleife
Kai Moritz [Wed, 13 Apr 2022 20:18:21 +0000 (22:18 +0200)]
Springify: `ErrorHandlingDeserializer` bricht die Schleife

* Die Konfiguration eines `ErrorHandlingDeserializer` durchbricht die
  Endlosschleife.
* Das Default-Verhalten von Spring Kafka ist aber dann in dem Fall, den
  Fehler zu loggen und dann zu skippen.
* Dieses Verhalten wird von dem `DefaultErrorHandler` vorgegeben

2 years agoSpringify: Kernfunktion von EndlessConsumer über Spring-Kafka
Kai Moritz [Tue, 12 Apr 2022 22:38:24 +0000 (00:38 +0200)]
Springify: Kernfunktion von EndlessConsumer über Spring-Kafka

* Alle weiteren Funktionen für dieses erste Experiment erst mal entfernt
* Testfall entsprechend angepasst
* Der Commit passiert hier, weil Spring Kafka per Default den eignen
  Commit-Modus `BATCH` aktiviert, der nach jedem abgearbeiteten `poll()`
  einen (synchronen!) Commit durchführt
* Der Test ist zwar grün, wenn man die App normal startet, verfängt sie
  sich jedoch in einer Endlosschleife, da kein Error-Handler konfiguriert
  ist, der die `RecordDeserializationException` korrekt behandeln kann, so
  dass sich die App in einem Life-Deadlock befindet, in dem sie immer
  wieder den Datensatz erhält, der den Fehler ausgelöst hat, dadurch
  aber nicht wie ein Vanilla-Consumer beendet wird, da Spring Kafka die
  Exception abfängt und weitermacht: neuer `poll()` für die selbe Position,
  so dass die App aus Sicht des GroupCoordinator noch lebt.
* DEBUG-Logging für `org.springframework.kafka` aktiviert, damit man die
  Commits sieht.

2 years agoTests: Geprüft, dass der Fehler nach einem Neustart neu vorliegt
Kai Moritz [Mon, 11 Apr 2022 14:19:44 +0000 (16:19 +0200)]
Tests: Geprüft, dass der Fehler nach einem Neustart neu vorliegt

2 years agoTests: Umbau für einen Commit im Fehlerfall und Anpassung des Tests
Kai Moritz [Sun, 10 Apr 2022 12:50:50 +0000 (14:50 +0200)]
Tests: Umbau für einen Commit im Fehlerfall und Anpassung des Tests

2 years agoTests: Refaktorisiert - Methoden-Reihenfolge aufgeräumt und kommentiert
Kai Moritz [Mon, 11 Apr 2022 08:04:53 +0000 (10:04 +0200)]
Tests: Refaktorisiert - Methoden-Reihenfolge aufgeräumt und kommentiert

2 years agoTests: Der Test wartet, bis die Offsets regulär committed wurden
Kai Moritz [Mon, 11 Apr 2022 07:41:40 +0000 (09:41 +0200)]
Tests: Der Test wartet, bis die Offsets regulär committed wurden

* Zuvor wurde der Offset-Commit erzwungen, indem der EndlessConsumer
  über den Aufruf von `stop()` beendet wurde.
* Jetzt wird die Überprüfung der Erwartungen über awaitility aufgeschoben,
  bis die Erwartungen beobachtet werden können - oder eine Zeitschranke
  gerissen wird.

2 years agoTests: Der EndlessConsumer wird jetzt doch asynchron ausgeführt
Kai Moritz [Sun, 10 Apr 2022 20:50:44 +0000 (22:50 +0200)]
Tests: Der EndlessConsumer wird jetzt doch asynchron ausgeführt

* Der Test-Code wird verständlicher, wenn der Consumer asynchron läuft
* Für die Überprüfung der Erwartungen wird dann awaitility benötigt
** Da der Consumer in einem separaten Thread läuft, muss auf diesen
   gezielt gewartet werden
** Dafür wird es deutlicher/verständlicher, auf welche der auch aus dem
   regulären Betrieb des EndlessConsumer bekannten Zustandsänderungen
   der Test wartet
* Da die Offsets für die Controlle (ggf.) abgefragt werden, während der
  EndlessConsumer noch läuft, wird ein separater KafkaConsumer benötigt,
  um die Offsets abzurufen.
  BEACHTE: Um die Änderungen in dem Offsets-Topic zu greifen zu bekommen,
  muss dieser KafkaConsumer für jede Abfrage ein `assign()` durchführen,
  damit er gezwungen ist, die Offsets neu vom Broker zu erfragen.

2 years agoTests: Refaktorisiert - DRY für Test auf Offset-Fortschritt
Kai Moritz [Sun, 10 Apr 2022 13:54:26 +0000 (15:54 +0200)]
Tests: Refaktorisiert - DRY für Test auf Offset-Fortschritt

2 years agoTests: Überprüft, ob überhaupt ein Offset-Fortschritt vorliegt
Kai Moritz [Sun, 10 Apr 2022 13:25:58 +0000 (15:25 +0200)]
Tests: Überprüft, ob überhaupt ein Offset-Fortschritt vorliegt

2 years agoTests: Offsets werden unter TopicPartition abgelegt
Kai Moritz [Sun, 10 Apr 2022 13:25:57 +0000 (15:25 +0200)]
Tests: Offsets werden unter TopicPartition abgelegt

2 years agoTests: Test-Reihenfolge definiert, da das Topic nicht geleert wird
Kai Moritz [Sun, 10 Apr 2022 14:02:07 +0000 (16:02 +0200)]
Tests: Test-Reihenfolge definiert, da das Topic nicht geleert wird

2 years agoTests: Erste Version eines synchronen Integration-Test implementiert
Kai Moritz [Sat, 9 Apr 2022 14:14:47 +0000 (16:14 +0200)]
Tests: Erste Version eines synchronen Integration-Test implementiert

2 years agoDemonstration der RecordDeserializationException
Kai Moritz [Sat, 9 Apr 2022 16:30:22 +0000 (18:30 +0200)]
Demonstration der RecordDeserializationException

* EndlessConsumer wird mit dem Value-Type `Long` anstatt `String` erzeugt
* Setup für die Demonstration der DeserializationException überarbeitet

2 years agoRefaktorisierung für Tests - EndlessConsumer typisiert
Kai Moritz [Sat, 9 Apr 2022 14:04:02 +0000 (16:04 +0200)]
Refaktorisierung für Tests - EndlessConsumer typisiert

2 years agoRefaktorisierung für Tests - Record-Handler als Bean konfigurierbar
Kai Moritz [Sat, 9 Apr 2022 11:40:47 +0000 (13:40 +0200)]
Refaktorisierung für Tests - Record-Handler als Bean konfigurierbar

* Ein Handler für die Verarbeitung der einzelnen ConsumerRecord's kann
  jetzt ein `java.util.function.Consumer` als Bean definiert werden
* Die Nachricht wird erst nach dem Aufruf des Handlers als konsumiert
  gezählt, damit der Handler die Nachricht über eine Exception ablehnen
  kann.

2 years agoDie Spring-Boot App wird nun sauber herunter gefahren
Kai Moritz [Mon, 11 Apr 2022 08:51:01 +0000 (10:51 +0200)]
Die Spring-Boot App wird nun sauber herunter gefahren

* Zuvor wurde die App nicht richtig beendet, weil der ExecutorService nicht
  beendet wurde und sich die JVM deswegen nicht beenden konnte.
* Dies ist erst durch die Aktivierung des shutdown-Endpoints aufgefallen.
* Jetzt wurde in `Application` eine `@PreDestroy`-Methode ergänzt, die
  den ExecutorService nach allen Regeln der Kunst ordentlich beendet.

2 years agoRefaktorisierung für Tests - ExecutorService als separate Bean erzeugt
Kai Moritz [Sat, 9 Apr 2022 11:21:43 +0000 (13:21 +0200)]
Refaktorisierung für Tests - ExecutorService als separate Bean erzeugt

2 years agoRefaktorisierung für Tests - Start des EndlessConsumer in ApplicationRunner
Kai Moritz [Sat, 9 Apr 2022 09:36:29 +0000 (11:36 +0200)]
Refaktorisierung für Tests - Start des EndlessConsumer in ApplicationRunner

* Bisher wurde der EndlessConsumer beim erzeugen der Bean gestartet
* Dies ist für das Aufsetzen von Tests ungünstig, da die erzeugte Bean
  dort nicht unbedingt direkt gestartet werden soll
* Daher wurde der Start des EndlessConsumer in einen ApplicationRunner
  ausgelagert

2 years agoRefaktorisierung für Tests - Bean-Definition in Config-Klasse verschoben
Kai Moritz [Sat, 9 Apr 2022 09:46:51 +0000 (11:46 +0200)]
Refaktorisierung für Tests - Bean-Definition in Config-Klasse verschoben

2 years agoRefaktorisierung für Tests - KafkaConsumer als eigenständige Bean
Kai Moritz [Sat, 9 Apr 2022 09:21:43 +0000 (11:21 +0200)]
Refaktorisierung für Tests - KafkaConsumer als eigenständige Bean

* Der KafakConsumer wird als eigenständige Bean erzeugt
* Die Bean wird dem EndlessConsumer im Konstruktor übergeben
* Dafür muss der Lebenszyklus der KafkaConsumer-Bean von dem der
  EndlessConsumer-Bean getrennt werden:
** close() darf nicht mehr im finally-Block im EndlessConsumer aufgerufen
   werden
** Stattdessen muss close() als Destry-Methode der Bean definiert werden
** Für start/stop muss stattdessen unsubscribe() im finally-Block aufgerufen
   werden
** Da unsubscribe() die Offset-Position nicht commited, muss explizit
   ein Offsset-Commit beauftragt werden, wenn der Consumer regulär
   gestoppt wird (WakeupException)

2 years agoDie Offsets werden ausgegeben, wenn eine Partition zugeteilt/entzogen wird
Kai Moritz [Sat, 9 Apr 2022 16:29:44 +0000 (18:29 +0200)]
Die Offsets werden ausgegeben, wenn eine Partition zugeteilt/entzogen wird

2 years agoGesehene Schlüssel sollten als long gezählt werden
Kai Moritz [Sat, 9 Apr 2022 16:00:48 +0000 (18:00 +0200)]
Gesehene Schlüssel sollten als long gezählt werden

2 years agoMerge branch 'endless-stream-consumer' into rebalance-listener
Kai Moritz [Sun, 10 Apr 2022 20:15:34 +0000 (22:15 +0200)]
Merge branch 'endless-stream-consumer' into rebalance-listener

2 years agoHealthIndicator implementiert
Kai Moritz [Sun, 10 Apr 2022 19:56:37 +0000 (21:56 +0200)]
HealthIndicator implementiert

2 years agoshutdown-Endpoint aktiviert
Kai Moritz [Sun, 10 Apr 2022 18:55:56 +0000 (20:55 +0200)]
shutdown-Endpoint aktiviert

2 years agoInformationen zur Kafka-Konfiguration im info-Endpoint sichtbar gemacht
Kai Moritz [Sun, 10 Apr 2022 18:55:42 +0000 (20:55 +0200)]
Informationen zur Kafka-Konfiguration im info-Endpoint sichtbar gemacht

2 years agoInformationen zur Java-Umgebung im info-Endpoint aktiviert
Kai Moritz [Sun, 10 Apr 2022 18:55:13 +0000 (20:55 +0200)]
Informationen zur Java-Umgebung im info-Endpoint aktiviert

2 years agoDas Git-Commit-Id-Plugin generiert eine git.properties
Kai Moritz [Sun, 10 Apr 2022 18:27:49 +0000 (20:27 +0200)]
Das Git-Commit-Id-Plugin generiert eine git.properties

2 years agoDas Spring-Boot-Maven-Plugin generiert Build-Info
Kai Moritz [Sun, 10 Apr 2022 18:25:28 +0000 (20:25 +0200)]
Das Spring-Boot-Maven-Plugin generiert Build-Info

2 years agoDefault-Konfiguration überarbeitet
Kai Moritz [Sun, 10 Apr 2022 17:52:09 +0000 (19:52 +0200)]
Default-Konfiguration überarbeitet

* Eine über die IDE bzw. Maven gestartete Instanz soll klar als solche
  erkennbar sein (`client.id` = DEV), darf dafür aber im Compose-Setup
  mitspielen (`group.id` = my-group).
* Bisher gab es häufig Port-Konflikte, wenn über die IDE bzw. über Maven
  parallel zu einem Compose-Setup eine Instanz gestartet wurde. Daher wird
  jetzt hier explizit auf einen abweichenden Port (8881) ausgewichen.
* Im Compose-Setup auch den neuen Port für den Producer übernommen

2 years agoFalschen Status-Code bei start/stop-Fehler korrigiert
Kai Moritz [Sun, 10 Apr 2022 17:49:44 +0000 (19:49 +0200)]
Falschen Status-Code bei start/stop-Fehler korrigiert

2 years agoVerständliche Fehlermeldungen für DriverController
Kai Moritz [Fri, 25 Mar 2022 14:20:00 +0000 (15:20 +0100)]
Verständliche Fehlermeldungen für DriverController

2 years agoMerge branch 'endless-stream-consumer' into rebalance-listener
Kai Moritz [Sun, 10 Apr 2022 17:03:38 +0000 (19:03 +0200)]
Merge branch 'endless-stream-consumer' into rebalance-listener

2 years agoThread-Synchronisation bei start/stop überarbeitet
Kai Moritz [Sun, 10 Apr 2022 16:58:16 +0000 (18:58 +0200)]
Thread-Synchronisation bei start/stop überarbeitet

2 years agoFehlerbehandlung in EndlessConsumer.destroy() korrigiert
Kai Moritz [Sat, 9 Apr 2022 16:50:43 +0000 (18:50 +0200)]
Fehlerbehandlung in EndlessConsumer.destroy() korrigiert

2 years agoValidierung erfolgt über spring-boot-starter-validation
Kai Moritz [Sat, 9 Apr 2022 07:52:59 +0000 (09:52 +0200)]
Validierung erfolgt über spring-boot-starter-validation

2 years agoMerge branch rebalance-listener into counting-consumer
Kai Moritz [Fri, 8 Apr 2022 10:44:37 +0000 (12:44 +0200)]
Merge branch rebalance-listener into counting-consumer

* Nur wegen der Rück-Umbenennung des counting-consumer in endless-consumer
* Die anderen Änderungen aus dem Branch wurden entfernt

2 years agoRückbau auf verschachtelte Maps
Kai Moritz [Thu, 7 Apr 2022 07:25:41 +0000 (09:25 +0200)]
Rückbau auf verschachtelte Maps

* Die zuvor erfundenen fachlichen Klassen passen nicht zu dazu, dass man
  sie - wie gedacht - direkt in MongoDB stopfen könnte.
* Daher hier erst mal der Rückbau auf Maps, da das dan für die Übungen
  einfacher ist.

2 years agocounting-consumer ist eine Verbesserung von endless-consumer
Kai Moritz [Wed, 6 Apr 2022 17:51:35 +0000 (19:51 +0200)]
counting-consumer ist eine Verbesserung von endless-consumer

2 years agoBenennung der neuen fachlichen Klassen verbessert
Kai Moritz [Wed, 6 Apr 2022 06:45:57 +0000 (08:45 +0200)]
Benennung der neuen fachlichen Klassen verbessert

2 years agoVerschachtelte Map gegen fachliche Datenstruktur ausgetauscht
Kai Moritz [Wed, 6 Apr 2022 06:34:18 +0000 (08:34 +0200)]
Verschachtelte Map gegen fachliche Datenstruktur ausgetauscht

2 years agoReport über gesehene Schlüssel wiederbelebt
Kai Moritz [Tue, 5 Apr 2022 20:37:55 +0000 (22:37 +0200)]
Report über gesehene Schlüssel wiederbelebt

* An der alten Stelle war die Map ja jetzt nur noch leer, da dem Consumer
  zu dem Zeitpunkt, an dem die gesehen Schlüssel ausgegeben wurden, bereits
  alle Partitionen entzogen worden sind.
* Daher werden die gesehenen Schlüssel jetzt ausgegeben, wenn eine
  Partition entzogen wird.

2 years agoRebalance-Listener anstatt Wegwerfen der Map
Kai Moritz [Tue, 5 Apr 2022 20:25:47 +0000 (22:25 +0200)]
Rebalance-Listener anstatt Wegwerfen der Map

2 years agoHTTPie gibt nicht nur den Response, sondern auch den Request aus
Kai Moritz [Sun, 3 Apr 2022 10:19:47 +0000 (12:19 +0200)]
HTTPie gibt nicht nur den Response, sondern auch den Request aus

2 years agoMerge des Upgrades der Confluent-Images auf 7.0.2
Kai Moritz [Sun, 3 Apr 2022 06:19:26 +0000 (08:19 +0200)]
Merge des Upgrades der Confluent-Images auf 7.0.2

2 years agoMerge des Upgrades der Confluent-Images auf 7.0.2
Kai Moritz [Sun, 3 Apr 2022 06:18:08 +0000 (08:18 +0200)]
Merge des Upgrades der Confluent-Images auf 7.0.2

2 years agoUpgrade der Images von Confluent 6.2.0 auf 7.0.2
Kai Moritz [Sun, 3 Apr 2022 06:15:30 +0000 (08:15 +0200)]
Upgrade der Images von Confluent 6.2.0 auf 7.0.2

2 years agoDer Consumer erkennt die Änderung der Partitionierung schneller
Kai Moritz [Sat, 2 Apr 2022 15:18:06 +0000 (17:18 +0200)]
Der Consumer erkennt die Änderung der Partitionierung schneller

* `metadata.max.age.ms` auf 1000 heruntergesetzt (Default 5 min)
* Dadurch wird die Änderung der Anzahl an Partitionen zeitnah erkannt

2 years agoFür null-Keys wird der String NULL gezählt
Kai Moritz [Sat, 2 Apr 2022 13:11:40 +0000 (15:11 +0200)]
Für null-Keys wird der String NULL gezählt

2 years agoREADME.sh zeigt die Auswirkung einer geänderten Partitions-Anzahl
Kai Moritz [Sat, 2 Apr 2022 08:42:05 +0000 (10:42 +0200)]
README.sh zeigt die Auswirkung einer geänderten Partitions-Anzahl

2 years agoDer Consumer zählt jetzt die Nachrichten pro Key für jedes Topic
Kai Moritz [Fri, 1 Apr 2022 20:02:28 +0000 (22:02 +0200)]
Der Consumer zählt jetzt die Nachrichten pro Key für jedes Topic

* Die Ergebnisse werden beim Beenden des Consumer ausgegeben
* Wenn der Consumer neu gestartet wird, werden die Ergebnisse zurückgesetzt
* Über /seen können Zwischenstände abgefragt werden

2 years ago`auto.offset.reset` konfigurierbar gemacht
Kai Moritz [Fri, 1 Apr 2022 09:44:22 +0000 (11:44 +0200)]
`auto.offset.reset` konfigurierbar gemacht

2 years agoFehler bei der Erzeugung des KafkaConsumer werden nicht mehr verschluckt
Kai Moritz [Fri, 1 Apr 2022 09:40:14 +0000 (11:40 +0200)]
Fehler bei der Erzeugung des KafkaConsumer werden nicht mehr verschluckt

* Beim Erzeugen der Properties-Instanz können Exceptions fliegen
* Beim Erzeugen der KafkaConsumer-Instanz können Exception fliegen
* Daher wurden diese Schritte in den try/catch-Block verlegt
* Neben der Nachricht wird jetzt auch der ganze Stack-Trace gelogged
* Da die Erzeugung des KafkaConsumer jetzt im try/catch-Block geschieht,
  wird der EndlessConsumer im Fehlerfall korrekt als beendet markiert