Auf `@KafkaHandler` umgestellt
authorKai Moritz <kai@juplo.de>
Sun, 4 Sep 2022 17:30:29 +0000 (19:30 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 16:08:45 +0000 (18:08 +0200)
commitf095f71a104fcde025a63f87ba75eb5cb3136656
tree94a1a61876caeb1b8b9362a87629ecc0fd15ba7f
parent2eb3c45c9438a20777b0110defa593dd45c64511
Auf `@KafkaHandler` umgestellt

* Die Autoconfiguration über die Annotation `@EnableKafka` aktiviert
* Da die Autoconfiguration von Spring Kafka zieht, vereinfacht sich die
  Konfiguration:
** Spring Kafka erzeugt den benötigten `MessageListenerContainer`, der
   für die Anbindung der mit `@KafkaHandler` annotierten Methode
   benötigt wird automatisch und versorgt ihn mit einem passenden
   `KafkaConsumer`, so dass letzterer nicht mehr explizit erzeugt werden
   muss.
** Der Scheduler wird von Spring Kafka erzeugt und verwaltet, so dass
   nicht mehr explizit ein `ExecutorService` erzeugt und beendet werden
   muss.
** Der Rebalance-Listener wird automatisch eingebunden, der
   `ApplicationRebalanceListener` muss allerdings von der richtigen
   Spring-Klasse ableiten, damit er von der Autoconfiguration gefunden
   wird.
* Um das von dem Testfall erwartete Default-Verhalten des `KafkaConsumer`
  mit dem `MessageListenerContainer` zu simulieren, musste ein
  angepasster `ErrorHandler` implementiert werden.
* Der Code zum Exception-Handling und zum Schließen des `KafkaConsumer`
  in `EndlessConsumer` entfällt.
* Der Code zum Starten/Stoppen in `EndlessConsumer` kann einfach die
  entsprechende Methoden des `MessageListenerContainers aufrufen. Dafür
  muss er allerdings erst die passende Instanz aus einer Registry über
  die Client-ID erfragen.
* Der Testfall musste an die Autoconfiguration angepasst werden:
** Die `KafkaAutoConfiguration` muss hier explizit eingebunden werden.
** Da auch der Test einen `KafkaConsumer` benötigt, muss die in der
   Anwendung nicht mehr benötigte Factory jetzt explizit für den Test
   bereitgestellt werden.
README.sh
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationErrorHandler.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java