From 6b4660134b64ecd19d870a4aad22fd275f7dd516 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Thu, 26 Sep 2024 11:05:48 +0200
Subject: [PATCH] Thread-Handling: Weniger Boilerplate-Code ohne
 `ThreadPoolTaskExecutor`

---
 src/main/java/de/juplo/kafka/Application.java | 54 +------------------
 .../java/de/juplo/kafka/ExampleProducer.java  | 35 ++++++++----
 2 files changed, 26 insertions(+), 63 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java
index d2699457..0069257f 100644
--- a/src/main/java/de/juplo/kafka/Application.java
+++ b/src/main/java/de/juplo/kafka/Application.java
@@ -1,64 +1,12 @@
 package de.juplo.kafka;
 
-import jakarta.annotation.PreDestroy;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.concurrent.ListenableFuture;
-
-import java.util.concurrent.ExecutionException;
 
 
 @SpringBootApplication
-@Slf4j
-public class Application implements ApplicationRunner
+public class Application
 {
-  @Autowired
-  ThreadPoolTaskExecutor taskExecutor;
-  @Autowired
-  Producer<?, ?> kafkaProducer;
-  @Autowired
-  ExampleProducer exampleProducer;
-  @Autowired
-  ConfigurableApplicationContext context;
-
-  ListenableFuture<Integer> consumerJob;
-
-  @Override
-  public void run(ApplicationArguments args) throws Exception
-  {
-    log.info("Starting SimpleConsumer");
-    consumerJob = taskExecutor.submitListenable(exampleProducer);
-    consumerJob.addCallback(
-      exitStatus ->
-      {
-        log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
-        SpringApplication.exit(context, () -> exitStatus);
-        },
-      t ->
-      {
-        log.error("SimpleConsumer exited abnormally!", t);
-        SpringApplication.exit(context, () -> 2);
-      });
-  }
-
-  @PreDestroy
-  public void shutdown() throws ExecutionException, InterruptedException
-  {
-    log.info("Signaling ExampleProducer to quit its work");
-    exampleProducer.shutdown();
-    log.info("Waiting for ExampleProducer to finish its work");
-    consumerJob.get();
-    log.info("ExampleProducer finished its work");
-  }
-
-
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java
index ccfa4ce9..bbe014e3 100644
--- a/src/main/java/de/juplo/kafka/ExampleProducer.java
+++ b/src/main/java/de/juplo/kafka/ExampleProducer.java
@@ -1,6 +1,5 @@
 package de.juplo.kafka;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -9,19 +8,33 @@ import java.util.concurrent.Callable;
 
 
 @Slf4j
-@RequiredArgsConstructor
-public class ExampleProducer implements Callable<Integer>
+public class ExampleProducer implements Runnable
 {
   private final String id;
   private final String topic;
   private final Producer<String, String> producer;
+  private final Thread workerThread;
 
   private volatile boolean running = true;
   private long produced = 0;
 
 
+  public ExampleProducer(
+    String id,
+    String topic,
+    Producer<String, String> producer)
+  {
+    this.id = id;
+    this.topic = topic;
+    this.producer = producer;
+
+    workerThread = new Thread(this, "ExampleProducer Worker-Thread");
+    workerThread.start();
+  }
+
+
   @Override
-  public Integer call()
+  public void run()
   {
     long i = 0;
 
@@ -35,12 +48,12 @@ public class ExampleProducer implements Callable<Integer>
     }
     catch (Exception e)
     {
-      log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced);
-      return 1;
+      log.error("{} - Unexpected error: {}!", id, e.toString());
+    }
+    finally
+    {
+      log.info("{}: Produced {} messages in total, exiting!", id, produced);
     }
-
-    log.info("{}: Produced {} messages in total, exiting!", id, produced);
-    return 0;
   }
 
   void send(String key, String value)
@@ -95,8 +108,10 @@ public class ExampleProducer implements Callable<Integer>
   }
 
 
-  public void shutdown()
+  public void shutdown() throws InterruptedException
   {
+    log.info("{} joining the worker-thread...", id);
     running = false;
+    workerThread.join();
   }
 }
-- 
2.20.1