From 22cbed2e155172b8f214214ff4c49d1013c7a80e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 26 Sep 2024 10:32:01 +0200 Subject: [PATCH] =?utf8?q?TMP=20--=20Als=20Holzweg=20erachtet.=20Es=20ist?= =?utf8?q?=20eigentlich=20viel=20einfacher,=20nur=20einen=20Thread=20zu=20?= =?utf8?q?pfelgen.=20Das=20sollte=20mit=20weniger=20Boilerplate-Code=20m?= =?utf8?q?=C3=B6glich=20sein,=20als=20dieser=20Ansatz.=20Der=20eigentlich?= =?utf8?q?=20Ausl=C3=B6ser=20f=C3=BCr=20den=20Abbruch=20war=20aber=20ein?= =?utf8?q?=20mysteri=C3=B6ses=20Problem=20beim=20Debugging.=20Beim=20Shutd?= =?utf8?q?own=20wurde=20immer=20der=20Default=20von=2030=20Sekunden=20auf?= =?utf8?q?=20die=20Bean=20`applicationTaskExecutor`=20gewartet.=20Das=20ma?= =?utf8?q?cht=20eigentlich=20gar=20keinen=20Sinn,=20weil=20(die=20letzen?= =?utf8?q?=20verzweifelten=20Commits)=20eigentlich=20noch=20mal=20sicher?= =?utf8?q?=20gestellt=20hatten,=20dass=20=5Falle=5F=20explizit=20ausgef?= =?utf8?q?=C3=BChrten=20Beans=20wirklich=20beendet=20werden.=20Es=20war=20?= =?utf8?q?also=20die=20Frage,=20auf=20welche=20Beans=20gewartet=20wird.=20?= =?utf8?q?Warten=20tut=20da=20`DefaultLifecycleProcessor#stop`.=20Genau=20?= =?utf8?q?dort=20werden=20die=20Log-Messages=20vor=20und=20nach=20den=2030?= =?utf8?q?=20Sekunden=20produziert.=20Hier=20haben=20aber=20die=20Breakpoi?= =?utf8?q?nts=20von=20Intellij=20nicht=20gezogen!=20Auch=20in=20`ExecutorC?= =?utf8?q?onfigurationSupport`=20und=20`ThreadPoolExecutor`=20nicht!=20Dah?= =?utf8?q?er=20konnte=20ich=20nicht=20herausfinden,=20worauf=20da=20warum?= =?utf8?q?=20gewartet=20wird!?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- src/main/java/de/juplo/kafka/Application.java | 23 ++++++++----------- .../java/de/juplo/kafka/ExampleProducer.java | 8 +++---- src/main/resources/application.yml | 5 +++- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index aba7709..36f4d6d 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -12,6 +12,7 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.concurrent.ListenableFuture; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -26,24 +27,20 @@ public class Application implements ApplicationRunner @Autowired ConfigurableApplicationContext context; - ListenableFuture consumerJob; + CompletableFuture consumerJob; @Override public void run(ApplicationArguments args) { log.info("Starting ExampleProducer"); - consumerJob = taskExecutor.submitListenable(exampleProducer); - consumerJob.addCallback( - exitStatus -> - { - log.info("ExampleProducer exited normally, exit-status: {}", exitStatus); - SpringApplication.exit(context, () -> exitStatus); - }, - t -> - { - log.error("ExampleProducer exited abnormally!", t); - SpringApplication.exit(context, () -> 2); - }); + consumerJob = taskExecutor.submitCompletable(exampleProducer); + consumerJob.thenAccept(none -> SpringApplication.exit(context, () -> 0)); + consumerJob.exceptionally(t -> + { + log.error("ExampleProducer exited abnormally!", t); + SpringApplication.exit(context, () -> 1); + return null; + }); } diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 826e1ac..88f3d9d 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -10,7 +10,7 @@ import java.util.concurrent.Callable; @Slf4j @RequiredArgsConstructor -public class ExampleProducer implements Callable +public class ExampleProducer implements Runnable { private final String id; private final String topic; @@ -21,7 +21,7 @@ public class ExampleProducer implements Callable @Override - public Integer call() + public void run() { long i = 0; @@ -36,7 +36,7 @@ public class ExampleProducer implements Callable catch (Exception e) { log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced); - return 1; + throw new RuntimeException(e); } finally { @@ -44,8 +44,6 @@ public class ExampleProducer implements Callable producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); } - - return 0; } void send(String key, String value) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 1afbe1c..d777109 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -30,6 +30,9 @@ info: compression-type: ${producer.compression-type} logging: level: - root: TRACE + root: INFO + de.juplo: DEBUG + org.apache.kafka: INFO + org.springframework: DEBUG server: port: 8880 -- 2.20.1