From eeb21755bd614c9df2d27bf4370388d5c76e1c1c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 4 Nov 2022 09:37:41 +0100 Subject: [PATCH] WIP:scheduling --- src/main/java/de/juplo/kafka/Application.java | 11 +++-------- .../de/juplo/kafka/ApplicationConfiguration.java | 13 +++---------- src/main/java/de/juplo/kafka/SimpleConsumer.java | 2 -- 3 files changed, 6 insertions(+), 20 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 376c4d3..f37f3d7 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -10,7 +10,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.Future; @@ -19,19 +19,17 @@ import java.util.concurrent.Future; public class Application implements ApplicationRunner { @Autowired - ExecutorService executorService; + Executor executor; @Autowired Consumer consumer; @Autowired SimpleConsumer simpleConsumer; - Future consumerJob; - @Override public void run(ApplicationArguments args) throws Exception { log.info("Starting SimpleConsumer"); - consumerJob = executorService.submit(simpleConsumer); + executor.execute(simpleConsumer); } @PreDestroy @@ -39,9 +37,6 @@ public class Application implements ApplicationRunner { log.info("Signaling SimpleConsumer to quit its work"); consumer.wakeup(); - log.info("Waiting for SimpleConsumer to finish its work"); - consumerJob.get(); - log.info("SimpleConsumer finished its work"); } diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index eab9aa9..18ef37d 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -7,36 +7,29 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.scheduling.annotation.EnableAsync; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.Executor; @Configuration +@EnableAsync @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) public class ApplicationConfiguration { @Bean public SimpleConsumer endlessConsumer( Consumer kafkaConsumer, - ExecutorService executor, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { return new SimpleConsumer( - executor, kafkaProperties.getClientId(), applicationProperties.getTopic(), kafkaConsumer); } - @Bean - public ExecutorService executor() - { - return Executors.newSingleThreadExecutor(); - } - @Bean(destroyMethod = "close") public Consumer kafkaConsumer(ConsumerFactory factory) { diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index d53f5e5..53bd112 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -9,14 +9,12 @@ import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.Arrays; -import java.util.concurrent.ExecutorService; @Slf4j @RequiredArgsConstructor public class SimpleConsumer implements Runnable { - private final ExecutorService executor; private final String id; private final String topic; private final Consumer consumer; -- 2.20.1