From: Kai Moritz Date: Fri, 15 Apr 2022 08:39:17 +0000 (+0200) Subject: Springify: ROT - Auto Startup in @KafkaListener deaktiviert X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=ad1f139b638e40e3c116ded9b5bfad911f0f00b8;p=demos%2Fkafka%2Ftraining Springify: ROT - Auto Startup in @KafkaListener deaktiviert * Ziel, näher an die Funktion des Vanilla-EndlessConsumer heranrücken. Der Testfall soll möglichst unverändert funktionieren. * Unklar, ob das für die Schulung hilfreich ist. * Hilft aber definitiv beim Verstehen der Mechanismen von Spring Kafka * Hier wurde jetzt erst mal der automatische Start des Containers unterbunden und stattdessen `Application` wieder zu einem `ApplicationRunner` mit `@PreDestroy`-Methode gemacht. * TODO: Testfall ist darüber erst mal ROT, da der Container dort jetzt nicht gestartet wird! Das war ja aber genau das Ziel, denn bei den übernommenen Testfällen gab es (zumindest theoretisch) Race-Conditions, weil der Container - anders als bei der Vanilla-Implementierung - immer schon lief. --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 76ba717..2994762 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,14 +1,42 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; + +import javax.annotation.PreDestroy; @SpringBootApplication @Slf4j -public class Application +public class Application implements ApplicationRunner { + @Autowired + KafkaListenerEndpointRegistry registry; + @Value("${consumer.client-id}") + String clientId; + + + @Override + public void run(ApplicationArguments args) throws Exception + { + log.info("Starting EndlessConsumer"); + this.registry.getListenerContainer(clientId).start(); + } + + @PreDestroy + public void stopExecutor() + { + log.info("Stopping EndlessConsumer"); + this.registry.getListenerContainer(clientId).stop(); + } + + public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 929bdbd..472e07d 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -23,7 +23,12 @@ public class EndlessConsumer Consumer> handler; - @KafkaListener(topics = "${consumer.topic}", containerFactory = "batchFactory") + @KafkaListener( + id = "${consumer.client-id}", + idIsGroup = false, + topics = "${consumer.topic}", + containerFactory = "batchFactory", + autoStartup = "false") public void receive(List> records) { // Do something with the data...