]> juplo.de Git - demos/kafka/training/commitdiff
TMP producer/spring-producer--Shutdown-Hängt
authorKai Moritz <kai@juplo.de>
Thu, 26 Sep 2024 08:32:01 +0000 (10:32 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 26 Sep 2024 08:45:27 +0000 (10:45 +0200)
--
Als Holzweg erachtet.
Es ist eigentlich viel einfacher, nur einen Thread zu pfelgen.
Das sollte mit weniger Boilerplate-Code möglich sein, als dieser Ansatz.
Der eigentlich Auslöser für den Abbruch war aber ein mysteriöses Problem beim Debugging.
Beim Shutdown wurde immer der Default von 30 Sekunden auf die Bean `applicationTaskExecutor` gewartet.
Das macht eigentlich gar keinen Sinn, weil (die letzen verzweifelten Commits) eigentlich noch mal sicher gestellt hatten, dass _alle_ explizit ausgeführten Beans wirklich beendet werden.
Es war also die Frage, auf welche Beans gewartet wird.
Warten tut da `DefaultLifecycleProcessor#stop`.
Genau dort werden die Log-Messages vor und nach den 30 Sekunden produziert.
Hier haben aber die Breakpoints von Intellij nicht gezogen!
Auch in `ExecutorConfigurationSupport` und `ThreadPoolExecutor` nicht!
Daher konnte ich nicht herausfinden, worauf da warum gewartet wird!

src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ExampleProducer.java
src/main/resources/application.yml

index aba77099983e86a4269e3775cf4191134f2b0fc6..36f4d6d0ce8834ca9b8479a23971713b22ca7c7c 100644 (file)
@@ -12,6 +12,7 @@ import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.util.concurrent.ListenableFuture;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 
@@ -26,24 +27,20 @@ public class Application implements ApplicationRunner
   @Autowired
   ConfigurableApplicationContext context;
 
-  ListenableFuture<Integer> consumerJob;
+  CompletableFuture<Void> consumerJob;
 
   @Override
   public void run(ApplicationArguments args)
   {
     log.info("Starting ExampleProducer");
-    consumerJob = taskExecutor.submitListenable(exampleProducer);
-    consumerJob.addCallback(
-      exitStatus ->
-      {
-        log.info("ExampleProducer exited normally, exit-status: {}", exitStatus);
-        SpringApplication.exit(context, () -> exitStatus);
-        },
-      t ->
-      {
-        log.error("ExampleProducer exited abnormally!", t);
-        SpringApplication.exit(context, () -> 2);
-      });
+    consumerJob = taskExecutor.submitCompletable(exampleProducer);
+    consumerJob.thenAccept(none -> SpringApplication.exit(context, () -> 0));
+    consumerJob.exceptionally(t ->
+    {
+      log.error("ExampleProducer exited abnormally!", t);
+      SpringApplication.exit(context, () -> 1);
+      return null;
+    });
   }
 
 
index 826e1acfa392d786599236e495e806d5d6bb9349..88f3d9d7c61e30e82de5ceaad3170da5b7bd6b2e 100644 (file)
@@ -10,7 +10,7 @@ import java.util.concurrent.Callable;
 
 @Slf4j
 @RequiredArgsConstructor
-public class ExampleProducer implements Callable<Integer>
+public class ExampleProducer implements Runnable
 {
   private final String id;
   private final String topic;
@@ -21,7 +21,7 @@ public class ExampleProducer implements Callable<Integer>
 
 
   @Override
-  public Integer call()
+  public void run()
   {
     long i = 0;
 
@@ -36,7 +36,7 @@ public class ExampleProducer implements Callable<Integer>
     catch (Exception e)
     {
       log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced);
-      return 1;
+      throw new RuntimeException(e);
     }
     finally
     {
@@ -44,8 +44,6 @@ public class ExampleProducer implements Callable<Integer>
       producer.close();
       log.info("{}: Produced {} messages in total, exiting!", id, produced);
     }
-
-    return 0;
   }
 
   void send(String key, String value)
index 1afbe1cf820df5d273759b493dc0d3d058c519e7..d77710975480f64b4fc2db5a6ef95a4c530596cc 100644 (file)
@@ -30,6 +30,9 @@ info:
     compression-type: ${producer.compression-type}
 logging:
   level:
-    root: TRACE
+    root: INFO
+    de.juplo: DEBUG
+    org.apache.kafka: INFO
+    org.springframework: DEBUG
 server:
   port: 8880