NEU
authorKai Moritz <kai@juplo.de>
Thu, 20 Apr 2023 11:37:35 +0000 (13:37 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 20 Apr 2023 12:07:08 +0000 (14:07 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java

index aeec9b1..ee5834e 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
@@ -26,10 +27,14 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Autowired
   ConfigurableApplicationContext context;
 
+  @Autowired
+  ChatRoomChannel chatRoomChannel;
+  @Autowired
+  Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer;
   @Autowired
   ChatMessageChannel chatMessageChannel;
   @Autowired
-  ChatRoomChannel chatRoomChannel;
+  Consumer<String, MessageTo> chatMessageChannelConsumer;
 
   CompletableFuture<Void> chatRoomChannelConsumerJob;
   CompletableFuture<Void> chatMessageChannelConsumerJob;
@@ -59,6 +64,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @PreDestroy
   public void joinChatRoomChannelConsumerJob()
   {
+    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+    chatRoomChannelConsumer.wakeup();
     log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
     chatRoomChannelConsumerJob.join();
     log.info("Joined the consumer of the ChatRoomChannel");
@@ -67,6 +74,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @PreDestroy
   public void joinChatMessageChannelConsumerJob()
   {
+    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+    chatMessageChannelConsumer.wakeup();
     log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
     chatMessageChannelConsumerJob.join();
     log.info("Joined the consumer of the ChatMessageChannel");