WIP:scheduling
authorKai Moritz <kai@juplo.de>
Fri, 4 Nov 2022 08:37:41 +0000 (09:37 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 4 Nov 2022 11:52:13 +0000 (12:52 +0100)
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/SimpleConsumer.java

index 376c4d3..f37f3d7 100644 (file)
@@ -10,7 +10,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 import javax.annotation.PreDestroy;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 
 
@@ -19,19 +19,17 @@ import java.util.concurrent.Future;
 public class Application implements ApplicationRunner
 {
   @Autowired
-  ExecutorService executorService;
+  Executor executor;
   @Autowired
   Consumer<?, ?> consumer;
   @Autowired
   SimpleConsumer simpleConsumer;
 
-  Future<?> consumerJob;
-
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
     log.info("Starting SimpleConsumer");
-    consumerJob = executorService.submit(simpleConsumer);
+    executor.execute(simpleConsumer);
   }
 
   @PreDestroy
@@ -39,9 +37,6 @@ public class Application implements ApplicationRunner
   {
     log.info("Signaling SimpleConsumer to quit its work");
     consumer.wakeup();
-    log.info("Waiting for SimpleConsumer to finish its work");
-    consumerJob.get();
-    log.info("SimpleConsumer finished its work");
   }
 
 
index eab9aa9..18ef37d 100644 (file)
@@ -7,36 +7,29 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.scheduling.annotation.EnableAsync;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Executor;
 
 
 @Configuration
+@EnableAsync
 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
 public class ApplicationConfiguration
 {
   @Bean
   public SimpleConsumer endlessConsumer(
       Consumer<String, String> kafkaConsumer,
-      ExecutorService executor,
       KafkaProperties kafkaProperties,
       ApplicationProperties applicationProperties)
   {
     return
         new SimpleConsumer(
-            executor,
             kafkaProperties.getClientId(),
             applicationProperties.getTopic(),
             kafkaConsumer);
   }
 
-  @Bean
-  public ExecutorService executor()
-  {
-    return Executors.newSingleThreadExecutor();
-  }
-
   @Bean(destroyMethod = "close")
   public Consumer<?, ?> kafkaConsumer(ConsumerFactory<?, ?> factory)
   {
index d53f5e5..53bd112 100644 (file)
@@ -9,14 +9,12 @@ import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.concurrent.ExecutorService;
 
 
 @Slf4j
 @RequiredArgsConstructor
 public class SimpleConsumer implements Runnable
 {
-  private final ExecutorService executor;
   private final String id;
   private final String topic;
   private final Consumer<String, String> consumer;