From: Kai Moritz Date: Fri, 4 Nov 2022 08:45:59 +0000 (+0100) Subject: WIP:async X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=b059b0e509cca9a16ae209ade49c967a66201de9;p=demos%2Fkafka%2Ftraining WIP:async --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index f37f3d7..513a293 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -7,19 +7,18 @@ import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableAsync; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; @SpringBootApplication +@EnableAsync @Slf4j public class Application implements ApplicationRunner { - @Autowired - Executor executor; @Autowired Consumer consumer; @Autowired @@ -27,13 +26,19 @@ public class Application implements ApplicationRunner @Override public void run(ApplicationArguments args) throws Exception + { + start(); + } + + @Async + public void start() { log.info("Starting SimpleConsumer"); - executor.execute(simpleConsumer); + simpleConsumer.run(); } @PreDestroy - public void shutdown() throws ExecutionException, InterruptedException + public void stop() throws ExecutionException, InterruptedException { log.info("Signaling SimpleConsumer to quit its work"); consumer.wakeup(); diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 18ef37d..de77c60 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -7,13 +7,9 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.scheduling.annotation.EnableAsync; - -import java.util.concurrent.Executor; @Configuration -@EnableAsync @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) public class ApplicationConfiguration { diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 53bd112..5305ceb 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -13,7 +13,7 @@ import java.util.Arrays; @Slf4j @RequiredArgsConstructor -public class SimpleConsumer implements Runnable +public class SimpleConsumer { private final String id; private final String topic; @@ -22,7 +22,6 @@ public class SimpleConsumer implements Runnable private long consumed = 0; - @Override public void run() { try