From: Kai Moritz Date: Thu, 26 Sep 2024 08:32:01 +0000 (+0200) Subject: TMP X-Git-Tag: producer/spring-producer--Shutdown-Hängt X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=22cbed2e155172b8f214214ff4c49d1013c7a80e;p=demos%2Fkafka%2Ftraining TMP -- Als Holzweg erachtet. Es ist eigentlich viel einfacher, nur einen Thread zu pfelgen. Das sollte mit weniger Boilerplate-Code möglich sein, als dieser Ansatz. Der eigentlich Auslöser für den Abbruch war aber ein mysteriöses Problem beim Debugging. Beim Shutdown wurde immer der Default von 30 Sekunden auf die Bean `applicationTaskExecutor` gewartet. Das macht eigentlich gar keinen Sinn, weil (die letzen verzweifelten Commits) eigentlich noch mal sicher gestellt hatten, dass _alle_ explizit ausgeführten Beans wirklich beendet werden. Es war also die Frage, auf welche Beans gewartet wird. Warten tut da `DefaultLifecycleProcessor#stop`. Genau dort werden die Log-Messages vor und nach den 30 Sekunden produziert. Hier haben aber die Breakpoints von Intellij nicht gezogen! Auch in `ExecutorConfigurationSupport` und `ThreadPoolExecutor` nicht! Daher konnte ich nicht herausfinden, worauf da warum gewartet wird! --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index aba7709..36f4d6d 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -12,6 +12,7 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.concurrent.ListenableFuture; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -26,24 +27,20 @@ public class Application implements ApplicationRunner @Autowired ConfigurableApplicationContext context; - ListenableFuture consumerJob; + CompletableFuture consumerJob; @Override public void run(ApplicationArguments args) { log.info("Starting ExampleProducer"); - consumerJob = taskExecutor.submitListenable(exampleProducer); - consumerJob.addCallback( - exitStatus -> - { - log.info("ExampleProducer exited normally, exit-status: {}", exitStatus); - SpringApplication.exit(context, () -> exitStatus); - }, - t -> - { - log.error("ExampleProducer exited abnormally!", t); - SpringApplication.exit(context, () -> 2); - }); + consumerJob = taskExecutor.submitCompletable(exampleProducer); + consumerJob.thenAccept(none -> SpringApplication.exit(context, () -> 0)); + consumerJob.exceptionally(t -> + { + log.error("ExampleProducer exited abnormally!", t); + SpringApplication.exit(context, () -> 1); + return null; + }); } diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 826e1ac..88f3d9d 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -10,7 +10,7 @@ import java.util.concurrent.Callable; @Slf4j @RequiredArgsConstructor -public class ExampleProducer implements Callable +public class ExampleProducer implements Runnable { private final String id; private final String topic; @@ -21,7 +21,7 @@ public class ExampleProducer implements Callable @Override - public Integer call() + public void run() { long i = 0; @@ -36,7 +36,7 @@ public class ExampleProducer implements Callable catch (Exception e) { log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced); - return 1; + throw new RuntimeException(e); } finally { @@ -44,8 +44,6 @@ public class ExampleProducer implements Callable producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); } - - return 0; } void send(String key, String value) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 1afbe1c..d777109 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -30,6 +30,9 @@ info: compression-type: ${producer.compression-type} logging: level: - root: TRACE + root: INFO + de.juplo: DEBUG + org.apache.kafka: INFO + org.springframework: DEBUG server: port: 8880