From: Kai Moritz Date: Thu, 26 Sep 2024 09:05:48 +0000 (+0200) Subject: Thread-Handling: Weniger Boilerplate-Code ohne `ThreadPoolTaskExecutor` X-Git-Tag: consumer/spring-consumer--BRANCH-ENDE~5 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=19e05582ccbd732a2e1e602797c104727a7aa565;p=demos%2Fkafka%2Ftraining Thread-Handling: Weniger Boilerplate-Code ohne `ThreadPoolTaskExecutor` --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 3900c5f..0069257 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,64 +1,12 @@ package de.juplo.kafka; -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.util.concurrent.ListenableFuture; - -import java.util.concurrent.ExecutionException; @SpringBootApplication -@Slf4j -public class Application implements ApplicationRunner +public class Application { - @Autowired - ThreadPoolTaskExecutor taskExecutor; - @Autowired - Consumer kafkaConsumer; - @Autowired - ExampleConsumer exampleConsumer; - @Autowired - ConfigurableApplicationContext context; - - ListenableFuture consumerJob; - - @Override - public void run(ApplicationArguments args) throws Exception - { - log.info("Starting ExampleConsumer"); - consumerJob = taskExecutor.submitListenable(exampleConsumer); - consumerJob.addCallback( - exitStatus -> - { - log.info("ExampleConsumer exited normally, exit-status: {}", exitStatus); - SpringApplication.exit(context, () -> exitStatus); - }, - t -> - { - log.error("ExampleConsumer exited abnormally!", t); - SpringApplication.exit(context, () -> 2); - }); - } - - @PreDestroy - public void shutdown() throws ExecutionException, InterruptedException - { - log.info("Signaling ExampleConsumer to quit its work"); - kafkaConsumer.wakeup(); - log.info("Waiting for ExampleConsumer to finish its work"); - consumerJob.get(); - log.info("ExampleConsumer finished its work"); - } - - public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a31c256..21e13a7 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -21,9 +21,9 @@ public class ApplicationConfiguration { return new ExampleConsumer( + kafkaConsumer, kafkaProperties.getClientId(), - applicationProperties.getTopics(), - kafkaConsumer); + applicationProperties.getTopics()); } @Bean diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index dd43b11..c5a5b49 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,6 +1,5 @@ package de.juplo.kafka; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -9,29 +8,44 @@ import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.Arrays; -import java.util.concurrent.Callable; @Slf4j -@RequiredArgsConstructor -public class ExampleConsumer implements Callable +public class ExampleConsumer implements Runnable { private final String id; private final String[] topics; private final Consumer consumer; + private final Thread workerThread; + private volatile boolean running = false; private long consumed = 0; + public ExampleConsumer( + Consumer consumer, + String clientId, + String... topics) + { + this.id = clientId; + this.topics = topics; + this.consumer = consumer; + + workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); + workerThread.start(); + } + + @Override - public Integer call() + public void run() { try { log.info("{} - Subscribing to topics: {}", id, topics); consumer.subscribe(Arrays.asList(topics)); + running = true; - while (true) + while (running) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); @@ -51,18 +65,14 @@ public class ExampleConsumer implements Callable catch(WakeupException e) { log.info("{} - Consumer was signaled to finish its work", id); - return 0; } catch(Exception e) { log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString()); consumer.unsubscribe(); - return 1; } finally { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } @@ -77,4 +87,13 @@ public class ExampleConsumer implements Callable consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); } + + + public void shutdown() throws InterruptedException + { + log.info("{} joining the worker-thread...", id); + running = false; + consumer.wakeup(); + workerThread.join(); + } }