X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaServicesApplicationRunner.java;h=8a9e32ed3339c8be73403ac0b4414a730e2f38ac;hb=1416ccc8a9eae999201dbf7c77c4d4906fc9fc24;hp=aeec9b149e170e359624df89e978bb35e8681884;hpb=f87d3e2fea3ee107d050bedc18d66471ae0fdd7e;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index aeec9b14..8a9e32ed 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -26,10 +27,14 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Autowired ConfigurableApplicationContext context; + @Autowired + ChatRoomChannel chatRoomChannel; + @Autowired + Consumer chatRoomChannelConsumer; @Autowired ChatMessageChannel chatMessageChannel; @Autowired - ChatRoomChannel chatRoomChannel; + Consumer chatMessageChannelConsumer; CompletableFuture chatRoomChannelConsumerJob; CompletableFuture chatMessageChannelConsumerJob; @@ -59,6 +64,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @PreDestroy public void joinChatRoomChannelConsumerJob() { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatRoomChannelConsumer.wakeup(); log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); chatRoomChannelConsumerJob.join(); log.info("Joined the consumer of the ChatRoomChannel"); @@ -67,6 +74,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @PreDestroy public void joinChatMessageChannelConsumerJob() { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatMessageChannelConsumer.wakeup(); log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); chatMessageChannelConsumerJob.join(); log.info("Joined the consumer of the ChatMessageChannel");