]> juplo.de Git - demos/kafka/training/commitdiff
Springify: ROT - Auto Startup in @KafkaListener deaktiviert
authorKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 08:39:17 +0000 (10:39 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 11:56:18 +0000 (13:56 +0200)
* Ziel, näher an die Funktion des Vanilla-EndlessConsumer heranrücken.
  Der Testfall soll möglichst unverändert funktionieren.
* Unklar, ob das für die Schulung hilfreich ist.
* Hilft aber definitiv beim Verstehen der Mechanismen von Spring Kafka
* Hier wurde jetzt erst mal der automatische Start des Containers
  unterbunden und stattdessen `Application` wieder zu einem
  `ApplicationRunner` mit `@PreDestroy`-Methode gemacht.
* TODO: Testfall ist darüber erst mal ROT, da der Container dort jetzt
  nicht gestartet wird! Das war ja aber genau das Ziel, denn bei den
  übernommenen Testfällen gab es (zumindest theoretisch) Race-Conditions,
  weil der Container - anders als bei der Vanilla-Implementierung -  immer
  schon lief.

src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 76ba717bb9e9e00dc2af475cf9e3a01e6b728bbf..2994762e699f8762f05c712f80fb2f7a23772937 100644 (file)
@@ -1,14 +1,42 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+
+import javax.annotation.PreDestroy;
 
 
 @SpringBootApplication
 @Slf4j
-public class Application
+public class Application implements ApplicationRunner
 {
+  @Autowired
+  KafkaListenerEndpointRegistry registry;
+  @Value("${consumer.client-id}")
+  String clientId;
+
+
+  @Override
+  public void run(ApplicationArguments args) throws Exception
+  {
+    log.info("Starting EndlessConsumer");
+    this.registry.getListenerContainer(clientId).start();
+  }
+
+  @PreDestroy
+  public void stopExecutor()
+  {
+    log.info("Stopping EndlessConsumer");
+    this.registry.getListenerContainer(clientId).stop();
+  }
+
+
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
index 929bdbd5ccf659c20cf86005686ed94406d7d7c6..472e07de08f8b28931c2f154ba701d565b9deae7 100644 (file)
@@ -23,7 +23,12 @@ public class EndlessConsumer<K, V>
   Consumer<ConsumerRecord<K, V>> handler;
 
 
-  @KafkaListener(topics = "${consumer.topic}", containerFactory = "batchFactory")
+  @KafkaListener(
+      id = "${consumer.client-id}",
+      idIsGroup = false,
+      topics = "${consumer.topic}",
+      containerFactory = "batchFactory",
+      autoStartup = "false")
   public void receive(List<ConsumerRecord<K, V>> records)
   {
     // Do something with the data...