feat: Consumer subscribes in `KafkaServicesApplicationRunner`
authorKai Moritz <kai@juplo.de>
Sat, 19 Aug 2023 10:58:27 +0000 (12:58 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 08:58:38 +0000 (10:58 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java

index c8bc41b..0b35f8c 100644 (file)
@@ -199,8 +199,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   @Override
   public void run()
   {
-    consumer.subscribe(List.of(topic), this);
-
     running = true;
 
     while (running)
index 5c7e0d8..8474239 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
 import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
@@ -12,6 +13,7 @@ import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 
@@ -23,6 +25,9 @@ import java.util.concurrent.CompletableFuture;
 @Slf4j
 public class KafkaServicesApplicationRunner implements ApplicationRunner
 {
+  @Autowired
+  ChatBackendProperties properties;
+
   @Autowired
   ThreadPoolTaskExecutor taskExecutor;
   @Autowired
@@ -39,6 +44,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
+    List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
+    chatRoomChannelConsumer.subscribe(topics, chatRoomChannel);
     log.info("Starting the consumer for the ChatRoomChannel");
     chatRoomChannelConsumerJob = taskExecutor
         .submitCompletable(chatRoomChannel)