From: Kai Moritz Date: Fri, 4 Nov 2022 09:53:36 +0000 (+0100) Subject: WIP:sleep X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=1e858c1835d47259583cf3cce18921c550682e2b;p=demos%2Fkafka%2Ftraining WIP:sleep --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 3157ef6..718676b 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -19,7 +19,7 @@ import java.util.concurrent.ExecutionException; public class Application implements ApplicationRunner { @Autowired - Consumer consumer; + Consumer kafkaConsumer; @Autowired SimpleConsumer simpleConsumer; @@ -27,14 +27,25 @@ public class Application implements ApplicationRunner public void run(ApplicationArguments args) throws Exception { log.info("Starting SimpleConsumer"); - simpleConsumer.start(); + simpleConsumer.run(); } @PreDestroy public void stop() throws ExecutionException, InterruptedException { log.info("Signaling SimpleConsumer to quit its work"); - consumer.wakeup(); + kafkaConsumer.wakeup(); + + while (simpleConsumer.isRunning()) + { + log.info("Waiting for SimpleConsumer to finish its work"); + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) {} + } + log.info("SimpleConsumer finished its work"); } diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 648bb9d..de77c60 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -6,7 +6,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.task.TaskExecutor; import org.springframework.kafka.core.ConsumerFactory; @@ -16,14 +15,12 @@ public class ApplicationConfiguration { @Bean public SimpleConsumer endlessConsumer( - TaskExecutor taskExecutor, Consumer kafkaConsumer, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { return new SimpleConsumer( - taskExecutor, kafkaProperties.getClientId(), applicationProperties.getTopic(), kafkaConsumer); diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 64d5176..4459a79 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -6,7 +6,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.errors.WakeupException; -import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.annotation.Async; import java.time.Duration; import java.util.Arrays; @@ -14,28 +14,24 @@ import java.util.Arrays; @Slf4j @RequiredArgsConstructor -public class SimpleConsumer implements Runnable +public class SimpleConsumer { - private final TaskExecutor taskExecutor; private final String id; private final String topic; private final Consumer consumer; + private volatile boolean running = false; private long consumed = 0; - public void start() - { - taskExecutor.execute(this); - } - - @Override + @Async public void run() { try { log.info("{} - Subscribing to topic test", id); consumer.subscribe(Arrays.asList(topic)); + running = true; while (true) { @@ -69,9 +65,15 @@ public class SimpleConsumer implements Runnable } finally { + running = false; log.info("{} - Closing the KafkaConsumer", id); consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } + + public boolean isRunning() + { + return running; + } }