NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesApplicationRunner.java
index aeec9b1..ee5834e 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
@@ -26,10 +27,14 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Autowired
   ConfigurableApplicationContext context;
 
+  @Autowired
+  ChatRoomChannel chatRoomChannel;
+  @Autowired
+  Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer;
   @Autowired
   ChatMessageChannel chatMessageChannel;
   @Autowired
-  ChatRoomChannel chatRoomChannel;
+  Consumer<String, MessageTo> chatMessageChannelConsumer;
 
   CompletableFuture<Void> chatRoomChannelConsumerJob;
   CompletableFuture<Void> chatMessageChannelConsumerJob;
@@ -59,6 +64,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @PreDestroy
   public void joinChatRoomChannelConsumerJob()
   {
+    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+    chatRoomChannelConsumer.wakeup();
     log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
     chatRoomChannelConsumerJob.join();
     log.info("Joined the consumer of the ChatRoomChannel");
@@ -67,6 +74,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @PreDestroy
   public void joinChatMessageChannelConsumerJob()
   {
+    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+    chatMessageChannelConsumer.wakeup();
     log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
     chatMessageChannelConsumerJob.join();
     log.info("Joined the consumer of the ChatMessageChannel");