Springify: ROT - Auto Startup in @KafkaListener deaktiviert
authorKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 08:39:17 +0000 (10:39 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 11:56:18 +0000 (13:56 +0200)
* Ziel, näher an die Funktion des Vanilla-EndlessConsumer heranrücken.
  Der Testfall soll möglichst unverändert funktionieren.
* Unklar, ob das für die Schulung hilfreich ist.
* Hilft aber definitiv beim Verstehen der Mechanismen von Spring Kafka
* Hier wurde jetzt erst mal der automatische Start des Containers
  unterbunden und stattdessen `Application` wieder zu einem
  `ApplicationRunner` mit `@PreDestroy`-Methode gemacht.
* TODO: Testfall ist darüber erst mal ROT, da der Container dort jetzt
  nicht gestartet wird! Das war ja aber genau das Ziel, denn bei den
  übernommenen Testfällen gab es (zumindest theoretisch) Race-Conditions,
  weil der Container - anders als bei der Vanilla-Implementierung -  immer
  schon lief.

src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 76ba717..2994762 100644 (file)
@@ -1,14 +1,42 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+
+import javax.annotation.PreDestroy;
 
 
 @SpringBootApplication
 @Slf4j
-public class Application
+public class Application implements ApplicationRunner
 {
+  @Autowired
+  KafkaListenerEndpointRegistry registry;
+  @Value("${consumer.client-id}")
+  String clientId;
+
+
+  @Override
+  public void run(ApplicationArguments args) throws Exception
+  {
+    log.info("Starting EndlessConsumer");
+    this.registry.getListenerContainer(clientId).start();
+  }
+
+  @PreDestroy
+  public void stopExecutor()
+  {
+    log.info("Stopping EndlessConsumer");
+    this.registry.getListenerContainer(clientId).stop();
+  }
+
+
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
index 929bdbd..472e07d 100644 (file)
@@ -23,7 +23,12 @@ public class EndlessConsumer<K, V>
   Consumer<ConsumerRecord<K, V>> handler;
 
 
-  @KafkaListener(topics = "${consumer.topic}", containerFactory = "batchFactory")
+  @KafkaListener(
+      id = "${consumer.client-id}",
+      idIsGroup = false,
+      topics = "${consumer.topic}",
+      containerFactory = "batchFactory",
+      autoStartup = "false")
   public void receive(List<ConsumerRecord<K, V>> records)
   {
     // Do something with the data...