From: Kai Moritz Date: Thu, 26 Sep 2024 09:05:48 +0000 (+0200) Subject: Thread-Handling: Weniger Boilerplate-Code ohne `ThreadPoolTaskExecutor` X-Git-Tag: producer/spring-producer--BRANCH-ENDE~1 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=6b4660134b64ecd19d870a4aad22fd275f7dd516;p=demos%2Fkafka%2Ftraining Thread-Handling: Weniger Boilerplate-Code ohne `ThreadPoolTaskExecutor` --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index d269945..0069257 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,64 +1,12 @@ package de.juplo.kafka; -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.util.concurrent.ListenableFuture; - -import java.util.concurrent.ExecutionException; @SpringBootApplication -@Slf4j -public class Application implements ApplicationRunner +public class Application { - @Autowired - ThreadPoolTaskExecutor taskExecutor; - @Autowired - Producer kafkaProducer; - @Autowired - ExampleProducer exampleProducer; - @Autowired - ConfigurableApplicationContext context; - - ListenableFuture consumerJob; - - @Override - public void run(ApplicationArguments args) throws Exception - { - log.info("Starting SimpleConsumer"); - consumerJob = taskExecutor.submitListenable(exampleProducer); - consumerJob.addCallback( - exitStatus -> - { - log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus); - SpringApplication.exit(context, () -> exitStatus); - }, - t -> - { - log.error("SimpleConsumer exited abnormally!", t); - SpringApplication.exit(context, () -> 2); - }); - } - - @PreDestroy - public void shutdown() throws ExecutionException, InterruptedException - { - log.info("Signaling ExampleProducer to quit its work"); - exampleProducer.shutdown(); - log.info("Waiting for ExampleProducer to finish its work"); - consumerJob.get(); - log.info("ExampleProducer finished its work"); - } - - public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index ccfa4ce..bbe014e 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -1,6 +1,5 @@ package de.juplo.kafka; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -9,19 +8,33 @@ import java.util.concurrent.Callable; @Slf4j -@RequiredArgsConstructor -public class ExampleProducer implements Callable +public class ExampleProducer implements Runnable { private final String id; private final String topic; private final Producer producer; + private final Thread workerThread; private volatile boolean running = true; private long produced = 0; + public ExampleProducer( + String id, + String topic, + Producer producer) + { + this.id = id; + this.topic = topic; + this.producer = producer; + + workerThread = new Thread(this, "ExampleProducer Worker-Thread"); + workerThread.start(); + } + + @Override - public Integer call() + public void run() { long i = 0; @@ -35,12 +48,12 @@ public class ExampleProducer implements Callable } catch (Exception e) { - log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced); - return 1; + log.error("{} - Unexpected error: {}!", id, e.toString()); + } + finally + { + log.info("{}: Produced {} messages in total, exiting!", id, produced); } - - log.info("{}: Produced {} messages in total, exiting!", id, produced); - return 0; } void send(String key, String value) @@ -95,8 +108,10 @@ public class ExampleProducer implements Callable } - public void shutdown() + public void shutdown() throws InterruptedException { + log.info("{} joining the worker-thread...", id); running = false; + workerThread.join(); } }