From d1604cb70a2074e2f2182caba0d0f28d53f5d95a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 4 Nov 2022 10:55:35 +0100 Subject: [PATCH] WIP:TaskExecutor-NEU --- src/main/java/de/juplo/kafka/Application.java | 21 ++++++------------- .../java/de/juplo/kafka/SimpleConsumer.java | 13 ++---------- 2 files changed, 8 insertions(+), 26 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 718676b..a9b466d 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -7,17 +7,19 @@ import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.scheduling.annotation.EnableAsync; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; @SpringBootApplication -@EnableAsync @Slf4j public class Application implements ApplicationRunner { + @Autowired + Executor executor; @Autowired Consumer kafkaConsumer; @Autowired @@ -27,25 +29,14 @@ public class Application implements ApplicationRunner public void run(ApplicationArguments args) throws Exception { log.info("Starting SimpleConsumer"); - simpleConsumer.run(); + executor.execute(simpleConsumer); } @PreDestroy - public void stop() throws ExecutionException, InterruptedException + public void shutdown() throws ExecutionException, InterruptedException { log.info("Signaling SimpleConsumer to quit its work"); kafkaConsumer.wakeup(); - - while (simpleConsumer.isRunning()) - { - log.info("Waiting for SimpleConsumer to finish its work"); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("SimpleConsumer finished its work"); } diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 4459a79..53bd112 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.errors.WakeupException; -import org.springframework.scheduling.annotation.Async; import java.time.Duration; import java.util.Arrays; @@ -14,24 +13,22 @@ import java.util.Arrays; @Slf4j @RequiredArgsConstructor -public class SimpleConsumer +public class SimpleConsumer implements Runnable { private final String id; private final String topic; private final Consumer consumer; - private volatile boolean running = false; private long consumed = 0; - @Async + @Override public void run() { try { log.info("{} - Subscribing to topic test", id); consumer.subscribe(Arrays.asList(topic)); - running = true; while (true) { @@ -65,15 +62,9 @@ public class SimpleConsumer } finally { - running = false; log.info("{} - Closing the KafkaConsumer", id); consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } - - public boolean isRunning() - { - return running; - } } -- 2.20.1