Thread-Handling: Weniger Boilerplate-Code ohne `ThreadPoolTaskExecutor`
authorKai Moritz <kai@juplo.de>
Thu, 26 Sep 2024 09:05:48 +0000 (11:05 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 28 Sep 2024 08:10:23 +0000 (10:10 +0200)
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ExampleProducer.java

index d269945..0069257 100644 (file)
@@ -1,64 +1,12 @@
 package de.juplo.kafka;
 
-import jakarta.annotation.PreDestroy;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.concurrent.ListenableFuture;
-
-import java.util.concurrent.ExecutionException;
 
 
 @SpringBootApplication
-@Slf4j
-public class Application implements ApplicationRunner
+public class Application
 {
-  @Autowired
-  ThreadPoolTaskExecutor taskExecutor;
-  @Autowired
-  Producer<?, ?> kafkaProducer;
-  @Autowired
-  ExampleProducer exampleProducer;
-  @Autowired
-  ConfigurableApplicationContext context;
-
-  ListenableFuture<Integer> consumerJob;
-
-  @Override
-  public void run(ApplicationArguments args) throws Exception
-  {
-    log.info("Starting SimpleConsumer");
-    consumerJob = taskExecutor.submitListenable(exampleProducer);
-    consumerJob.addCallback(
-      exitStatus ->
-      {
-        log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
-        SpringApplication.exit(context, () -> exitStatus);
-        },
-      t ->
-      {
-        log.error("SimpleConsumer exited abnormally!", t);
-        SpringApplication.exit(context, () -> 2);
-      });
-  }
-
-  @PreDestroy
-  public void shutdown() throws ExecutionException, InterruptedException
-  {
-    log.info("Signaling ExampleProducer to quit its work");
-    exampleProducer.shutdown();
-    log.info("Waiting for ExampleProducer to finish its work");
-    consumerJob.get();
-    log.info("ExampleProducer finished its work");
-  }
-
-
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
index ccfa4ce..bbe014e 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -9,19 +8,33 @@ import java.util.concurrent.Callable;
 
 
 @Slf4j
-@RequiredArgsConstructor
-public class ExampleProducer implements Callable<Integer>
+public class ExampleProducer implements Runnable
 {
   private final String id;
   private final String topic;
   private final Producer<String, String> producer;
+  private final Thread workerThread;
 
   private volatile boolean running = true;
   private long produced = 0;
 
 
+  public ExampleProducer(
+    String id,
+    String topic,
+    Producer<String, String> producer)
+  {
+    this.id = id;
+    this.topic = topic;
+    this.producer = producer;
+
+    workerThread = new Thread(this, "ExampleProducer Worker-Thread");
+    workerThread.start();
+  }
+
+
   @Override
-  public Integer call()
+  public void run()
   {
     long i = 0;
 
@@ -35,12 +48,12 @@ public class ExampleProducer implements Callable<Integer>
     }
     catch (Exception e)
     {
-      log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced);
-      return 1;
+      log.error("{} - Unexpected error: {}!", id, e.toString());
+    }
+    finally
+    {
+      log.info("{}: Produced {} messages in total, exiting!", id, produced);
     }
-
-    log.info("{}: Produced {} messages in total, exiting!", id, produced);
-    return 0;
   }
 
   void send(String key, String value)
@@ -95,8 +108,10 @@ public class ExampleProducer implements Callable<Integer>
   }
 
 
-  public void shutdown()
+  public void shutdown() throws InterruptedException
   {
+    log.info("{} joining the worker-thread...", id);
     running = false;
+    workerThread.join();
   }
 }