From: Kai Moritz Date: Thu, 20 Apr 2023 11:37:35 +0000 (+0200) Subject: NEU X-Git-Tag: rebase--2023-08-18~16 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d50a4fbdddd6aaba0392e9bdd0b8a4a7ba89a8f1;p=demos%2Fkafka%2Fchat NEU --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index aeec9b14..ee5834e5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -26,10 +27,14 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Autowired ConfigurableApplicationContext context; + @Autowired + ChatRoomChannel chatRoomChannel; + @Autowired + Consumer chatRoomChannelConsumer; @Autowired ChatMessageChannel chatMessageChannel; @Autowired - ChatRoomChannel chatRoomChannel; + Consumer chatMessageChannelConsumer; CompletableFuture chatRoomChannelConsumerJob; CompletableFuture chatMessageChannelConsumerJob; @@ -59,6 +64,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @PreDestroy public void joinChatRoomChannelConsumerJob() { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatRoomChannelConsumer.wakeup(); log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); chatRoomChannelConsumerJob.join(); log.info("Joined the consumer of the ChatRoomChannel"); @@ -67,6 +74,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @PreDestroy public void joinChatMessageChannelConsumerJob() { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatMessageChannelConsumer.wakeup(); log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); chatMessageChannelConsumerJob.join(); log.info("Joined the consumer of the ChatMessageChannel");