Springify: `start()`/`stop()`/`destroy()` in EndlessConsumer wiederbelebt
authorKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 08:56:40 +0000 (10:56 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 11:56:25 +0000 (13:56 +0200)
* Die Logik für Start/Stop liegt jetzt wieder wie vor der Umstellung in
  dem EndlessConsumer
* Der `ApplicationRunner` kenn den `EndlessConsumer` und ruft für diese
  `start()` auf
* Entsprechende Aufrufe im `DriverController` wiederbelebt.
* Das Stoppen beim Herunterfahren der App wird wieder über eine
  `@PreDestroy`-Methode im `EndlessConsumer` realisiert.

src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 2994762..f227bbe 100644 (file)
@@ -2,14 +2,10 @@ package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-
-import javax.annotation.PreDestroy;
 
 
 @SpringBootApplication
@@ -17,23 +13,14 @@ import javax.annotation.PreDestroy;
 public class Application implements ApplicationRunner
 {
   @Autowired
-  KafkaListenerEndpointRegistry registry;
-  @Value("${consumer.client-id}")
-  String clientId;
+  EndlessConsumer endlessConsumer;
 
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
     log.info("Starting EndlessConsumer");
-    this.registry.getListenerContainer(clientId).start();
-  }
-
-  @PreDestroy
-  public void stopExecutor()
-  {
-    log.info("Stopping EndlessConsumer");
-    this.registry.getListenerContainer(clientId).stop();
+    endlessConsumer.start();
   }
 
 
index 8ca3e2a..480e7d1 100644 (file)
@@ -17,14 +17,19 @@ import java.util.concurrent.ExecutionException;
 @RequiredArgsConstructor
 public class DriverController
 {
+  private final EndlessConsumer consumer;
+
+
   @PostMapping("start")
   public void start()
   {
+    consumer.start();
   }
 
   @PostMapping("stop")
-  public void stop() throws ExecutionException, InterruptedException
+  public void stop()
   {
+    consumer.stop();
   }
 
   @GetMapping("seen")
index 472e07d..ea899cc 100644 (file)
@@ -6,8 +6,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PreDestroy;
 import java.util.List;
 import java.util.function.Consumer;
 
@@ -17,11 +19,14 @@ import java.util.function.Consumer;
 @RequiredArgsConstructor
 public class EndlessConsumer<K, V>
 {
+  @Autowired
+  private KafkaListenerEndpointRegistry registry;
   @Value("${consumer.client-id}")
   String id;
   @Autowired
   Consumer<ConsumerRecord<K, V>> handler;
 
+  private long consumed = 0;
 
   @KafkaListener(
       id = "${consumer.client-id}",
@@ -46,6 +51,29 @@ public class EndlessConsumer<K, V>
       );
 
       handler.accept(record);
+
+      consumed++;
     }
   }
+
+
+  public synchronized void start()
+  {
+    log.info("{} - Starting - consumed {} messages before", id, consumed);
+    registry.getListenerContainer(id).start();
+  }
+
+  public synchronized void stop()
+  {
+    log.info("{} - Stopping", id);
+    registry.getListenerContainer(id).stop();
+    log.info("{} - Stopped - consumed {} messages so far", id, consumed);
+  }
+
+  @PreDestroy
+  public void destroy()
+  {
+    log.info("{} - Destroy!", id);
+    stop();
+  }
 }