TMP producer/spring-producer--Shutdown-Hängt
authorKai Moritz <kai@juplo.de>
Thu, 26 Sep 2024 08:32:01 +0000 (10:32 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 26 Sep 2024 08:45:27 +0000 (10:45 +0200)
--
Als Holzweg erachtet.
Es ist eigentlich viel einfacher, nur einen Thread zu pfelgen.
Das sollte mit weniger Boilerplate-Code möglich sein, als dieser Ansatz.
Der eigentlich Auslöser für den Abbruch war aber ein mysteriöses Problem beim Debugging.
Beim Shutdown wurde immer der Default von 30 Sekunden auf die Bean `applicationTaskExecutor` gewartet.
Das macht eigentlich gar keinen Sinn, weil (die letzen verzweifelten Commits) eigentlich noch mal sicher gestellt hatten, dass _alle_ explizit ausgeführten Beans wirklich beendet werden.
Es war also die Frage, auf welche Beans gewartet wird.
Warten tut da `DefaultLifecycleProcessor#stop`.
Genau dort werden die Log-Messages vor und nach den 30 Sekunden produziert.
Hier haben aber die Breakpoints von Intellij nicht gezogen!
Auch in `ExecutorConfigurationSupport` und `ThreadPoolExecutor` nicht!
Daher konnte ich nicht herausfinden, worauf da warum gewartet wird!

src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ExampleProducer.java
src/main/resources/application.yml

index aba7709..36f4d6d 100644 (file)
@@ -12,6 +12,7 @@ import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.util.concurrent.ListenableFuture;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 
@@ -26,24 +27,20 @@ public class Application implements ApplicationRunner
   @Autowired
   ConfigurableApplicationContext context;
 
-  ListenableFuture<Integer> consumerJob;
+  CompletableFuture<Void> consumerJob;
 
   @Override
   public void run(ApplicationArguments args)
   {
     log.info("Starting ExampleProducer");
-    consumerJob = taskExecutor.submitListenable(exampleProducer);
-    consumerJob.addCallback(
-      exitStatus ->
-      {
-        log.info("ExampleProducer exited normally, exit-status: {}", exitStatus);
-        SpringApplication.exit(context, () -> exitStatus);
-        },
-      t ->
-      {
-        log.error("ExampleProducer exited abnormally!", t);
-        SpringApplication.exit(context, () -> 2);
-      });
+    consumerJob = taskExecutor.submitCompletable(exampleProducer);
+    consumerJob.thenAccept(none -> SpringApplication.exit(context, () -> 0));
+    consumerJob.exceptionally(t ->
+    {
+      log.error("ExampleProducer exited abnormally!", t);
+      SpringApplication.exit(context, () -> 1);
+      return null;
+    });
   }
 
 
index 826e1ac..88f3d9d 100644 (file)
@@ -10,7 +10,7 @@ import java.util.concurrent.Callable;
 
 @Slf4j
 @RequiredArgsConstructor
-public class ExampleProducer implements Callable<Integer>
+public class ExampleProducer implements Runnable
 {
   private final String id;
   private final String topic;
@@ -21,7 +21,7 @@ public class ExampleProducer implements Callable<Integer>
 
 
   @Override
-  public Integer call()
+  public void run()
   {
     long i = 0;
 
@@ -36,7 +36,7 @@ public class ExampleProducer implements Callable<Integer>
     catch (Exception e)
     {
       log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced);
-      return 1;
+      throw new RuntimeException(e);
     }
     finally
     {
@@ -44,8 +44,6 @@ public class ExampleProducer implements Callable<Integer>
       producer.close();
       log.info("{}: Produced {} messages in total, exiting!", id, produced);
     }
-
-    return 0;
   }
 
   void send(String key, String value)
index 1afbe1c..d777109 100644 (file)
@@ -30,6 +30,9 @@ info:
     compression-type: ${producer.compression-type}
 logging:
   level:
-    root: TRACE
+    root: INFO
+    de.juplo: DEBUG
+    org.apache.kafka: INFO
+    org.springframework: DEBUG
 server:
   port: 8880