From ad1f139b638e40e3c116ded9b5bfad911f0f00b8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 15 Apr 2022 10:39:17 +0200 Subject: [PATCH] Springify: ROT - Auto Startup in @KafkaListener deaktiviert MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Ziel, näher an die Funktion des Vanilla-EndlessConsumer heranrücken. Der Testfall soll möglichst unverändert funktionieren. * Unklar, ob das für die Schulung hilfreich ist. * Hilft aber definitiv beim Verstehen der Mechanismen von Spring Kafka * Hier wurde jetzt erst mal der automatische Start des Containers unterbunden und stattdessen `Application` wieder zu einem `ApplicationRunner` mit `@PreDestroy`-Methode gemacht. * TODO: Testfall ist darüber erst mal ROT, da der Container dort jetzt nicht gestartet wird! Das war ja aber genau das Ziel, denn bei den übernommenen Testfällen gab es (zumindest theoretisch) Race-Conditions, weil der Container - anders als bei der Vanilla-Implementierung - immer schon lief. --- src/main/java/de/juplo/kafka/Application.java | 30 ++++++++++++++++++- .../java/de/juplo/kafka/EndlessConsumer.java | 7 ++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 76ba717..2994762 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,14 +1,42 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; + +import javax.annotation.PreDestroy; @SpringBootApplication @Slf4j -public class Application +public class Application implements ApplicationRunner { + @Autowired + KafkaListenerEndpointRegistry registry; + @Value("${consumer.client-id}") + String clientId; + + + @Override + public void run(ApplicationArguments args) throws Exception + { + log.info("Starting EndlessConsumer"); + this.registry.getListenerContainer(clientId).start(); + } + + @PreDestroy + public void stopExecutor() + { + log.info("Stopping EndlessConsumer"); + this.registry.getListenerContainer(clientId).stop(); + } + + public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 929bdbd..472e07d 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -23,7 +23,12 @@ public class EndlessConsumer Consumer> handler; - @KafkaListener(topics = "${consumer.topic}", containerFactory = "batchFactory") + @KafkaListener( + id = "${consumer.client-id}", + idIsGroup = false, + topics = "${consumer.topic}", + containerFactory = "batchFactory", + autoStartup = "false") public void receive(List> records) { // Do something with the data... -- 2.20.1