From c856a4c8914912fd866a445aa744a155b3eb37d9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 20 Apr 2023 13:37:35 +0200 Subject: [PATCH] NEU --- .../kafka/KafkaServicesApplicationRunner.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index aeec9b14..ee5834e5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -26,10 +27,14 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Autowired ConfigurableApplicationContext context; + @Autowired + ChatRoomChannel chatRoomChannel; + @Autowired + Consumer chatRoomChannelConsumer; @Autowired ChatMessageChannel chatMessageChannel; @Autowired - ChatRoomChannel chatRoomChannel; + Consumer chatMessageChannelConsumer; CompletableFuture chatRoomChannelConsumerJob; CompletableFuture chatMessageChannelConsumerJob; @@ -59,6 +64,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @PreDestroy public void joinChatRoomChannelConsumerJob() { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatRoomChannelConsumer.wakeup(); log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); chatRoomChannelConsumerJob.join(); log.info("Joined the consumer of the ChatRoomChannel"); @@ -67,6 +74,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @PreDestroy public void joinChatMessageChannelConsumerJob() { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatMessageChannelConsumer.wakeup(); log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); chatMessageChannelConsumerJob.join(); log.info("Joined the consumer of the ChatMessageChannel"); -- 2.20.1