Thread-Handling: Weniger Boilerplate-Code ohne `ThreadPoolTaskExecutor`
authorKai Moritz <kai@juplo.de>
Thu, 26 Sep 2024 09:05:48 +0000 (11:05 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 29 Sep 2024 19:22:58 +0000 (21:22 +0200)
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 3900c5f..0069257 100644 (file)
@@ -1,64 +1,12 @@
 package de.juplo.kafka;
 
-import jakarta.annotation.PreDestroy;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.concurrent.ListenableFuture;
-
-import java.util.concurrent.ExecutionException;
 
 
 @SpringBootApplication
-@Slf4j
-public class Application implements ApplicationRunner
+public class Application
 {
-  @Autowired
-  ThreadPoolTaskExecutor taskExecutor;
-  @Autowired
-  Consumer<?, ?> kafkaConsumer;
-  @Autowired
-  ExampleConsumer exampleConsumer;
-  @Autowired
-  ConfigurableApplicationContext context;
-
-  ListenableFuture<Integer> consumerJob;
-
-  @Override
-  public void run(ApplicationArguments args) throws Exception
-  {
-    log.info("Starting ExampleConsumer");
-    consumerJob = taskExecutor.submitListenable(exampleConsumer);
-    consumerJob.addCallback(
-      exitStatus ->
-      {
-        log.info("ExampleConsumer exited normally, exit-status: {}", exitStatus);
-        SpringApplication.exit(context, () -> exitStatus);
-        },
-      t ->
-      {
-        log.error("ExampleConsumer exited abnormally!", t);
-        SpringApplication.exit(context, () -> 2);
-      });
-  }
-
-  @PreDestroy
-  public void shutdown() throws ExecutionException, InterruptedException
-  {
-    log.info("Signaling ExampleConsumer to quit its work");
-    kafkaConsumer.wakeup();
-    log.info("Waiting for ExampleConsumer to finish its work");
-    consumerJob.get();
-    log.info("ExampleConsumer finished its work");
-  }
-
-
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
index a31c256..21e13a7 100644 (file)
@@ -21,9 +21,9 @@ public class ApplicationConfiguration
   {
     return
         new ExampleConsumer(
+            kafkaConsumer,
             kafkaProperties.getClientId(),
-            applicationProperties.getTopics(),
-            kafkaConsumer);
+            applicationProperties.getTopics());
   }
 
   @Bean
index dd43b11..c5a5b49 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -9,29 +8,44 @@ import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.concurrent.Callable;
 
 
 @Slf4j
-@RequiredArgsConstructor
-public class ExampleConsumer implements Callable<Integer>
+public class ExampleConsumer implements Runnable
 {
   private final String id;
   private final String[] topics;
   private final Consumer<String, String> consumer;
+  private final Thread workerThread;
 
+  private volatile boolean running = false;
   private long consumed = 0;
 
 
+  public ExampleConsumer(
+    Consumer<String, String> consumer,
+    String clientId,
+    String... topics)
+  {
+    this.id = clientId;
+    this.topics = topics;
+    this.consumer = consumer;
+
+    workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
+    workerThread.start();
+  }
+
+
   @Override
-  public Integer call()
+  public void run()
   {
     try
     {
       log.info("{} - Subscribing to topics: {}", id, topics);
       consumer.subscribe(Arrays.asList(topics));
+      running = true;
 
-      while (true)
+      while (running)
       {
         ConsumerRecords<String, String> records =
             consumer.poll(Duration.ofSeconds(1));
@@ -51,18 +65,14 @@ public class ExampleConsumer implements Callable<Integer>
     catch(WakeupException e)
     {
       log.info("{} - Consumer was signaled to finish its work", id);
-      return 0;
     }
     catch(Exception e)
     {
       log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString());
       consumer.unsubscribe();
-      return 1;
     }
     finally
     {
-      log.info("{} - Closing the KafkaConsumer", id);
-      consumer.close();
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
     }
   }
@@ -77,4 +87,13 @@ public class ExampleConsumer implements Callable<Integer>
     consumed++;
     log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
   }
+
+
+  public void shutdown() throws InterruptedException
+  {
+    log.info("{} joining the worker-thread...", id);
+    running = false;
+    consumer.wakeup();
+    workerThread.join();
+  }
 }