From e446dd0b69167688f77bcc2a51fa551ab61916dc Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 15 Apr 2022 10:56:40 +0200 Subject: [PATCH] Springify: `start()`/`stop()`/`destroy()` in EndlessConsumer wiederbelebt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Logik für Start/Stop liegt jetzt wieder wie vor der Umstellung in dem EndlessConsumer * Der `ApplicationRunner` kenn den `EndlessConsumer` und ruft für diese `start()` auf * Entsprechende Aufrufe im `DriverController` wiederbelebt. * Das Stoppen beim Herunterfahren der App wird wieder über eine `@PreDestroy`-Methode im `EndlessConsumer` realisiert. --- src/main/java/de/juplo/kafka/Application.java | 17 ++--------- .../java/de/juplo/kafka/DriverController.java | 7 ++++- .../java/de/juplo/kafka/EndlessConsumer.java | 28 +++++++++++++++++++ 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 2994762..f227bbe 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -2,14 +2,10 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; - -import javax.annotation.PreDestroy; @SpringBootApplication @@ -17,23 +13,14 @@ import javax.annotation.PreDestroy; public class Application implements ApplicationRunner { @Autowired - KafkaListenerEndpointRegistry registry; - @Value("${consumer.client-id}") - String clientId; + EndlessConsumer endlessConsumer; @Override public void run(ApplicationArguments args) throws Exception { log.info("Starting EndlessConsumer"); - this.registry.getListenerContainer(clientId).start(); - } - - @PreDestroy - public void stopExecutor() - { - log.info("Stopping EndlessConsumer"); - this.registry.getListenerContainer(clientId).stop(); + endlessConsumer.start(); } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 8ca3e2a..480e7d1 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -17,14 +17,19 @@ import java.util.concurrent.ExecutionException; @RequiredArgsConstructor public class DriverController { + private final EndlessConsumer consumer; + + @PostMapping("start") public void start() { + consumer.start(); } @PostMapping("stop") - public void stop() throws ExecutionException, InterruptedException + public void stop() { + consumer.stop(); } @GetMapping("seen") diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 472e07d..ea899cc 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -6,8 +6,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; import java.util.List; import java.util.function.Consumer; @@ -17,11 +19,14 @@ import java.util.function.Consumer; @RequiredArgsConstructor public class EndlessConsumer { + @Autowired + private KafkaListenerEndpointRegistry registry; @Value("${consumer.client-id}") String id; @Autowired Consumer> handler; + private long consumed = 0; @KafkaListener( id = "${consumer.client-id}", @@ -46,6 +51,29 @@ public class EndlessConsumer ); handler.accept(record); + + consumed++; } } + + + public synchronized void start() + { + log.info("{} - Starting - consumed {} messages before", id, consumed); + registry.getListenerContainer(id).start(); + } + + public synchronized void stop() + { + log.info("{} - Stopping", id); + registry.getListenerContainer(id).stop(); + log.info("{} - Stopped - consumed {} messages so far", id, consumed); + } + + @PreDestroy + public void destroy() + { + log.info("{} - Destroy!", id); + stop(); + } } -- 2.20.1