From: Kai Moritz Date: Sat, 19 Aug 2023 10:58:27 +0000 (+0200) Subject: feat: Consumer subscribes in `KafkaServicesApplicationRunner` X-Git-Tag: rebase--2023-08-20~7 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=1afc96421246a7fd2c697911da2abb4c4fde6341;p=demos%2Fkafka%2Fchat feat: Consumer subscribes in `KafkaServicesApplicationRunner` --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index c8bc41b0..0b35f8ce 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -199,8 +199,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener @Override public void run() { - consumer.subscribe(List.of(topic), this); - running = true; while (running) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index 5c7e0d8c..8474239b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; +import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; @@ -12,6 +13,7 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import java.util.List; import java.util.concurrent.CompletableFuture; @@ -23,6 +25,9 @@ import java.util.concurrent.CompletableFuture; @Slf4j public class KafkaServicesApplicationRunner implements ApplicationRunner { + @Autowired + ChatBackendProperties properties; + @Autowired ThreadPoolTaskExecutor taskExecutor; @Autowired @@ -39,6 +44,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Override public void run(ApplicationArguments args) throws Exception { + List topics = List.of(properties.getKafka().getChatRoomChannelTopic()); + chatRoomChannelConsumer.subscribe(topics, chatRoomChannel); log.info("Starting the consumer for the ChatRoomChannel"); chatRoomChannelConsumerJob = taskExecutor .submitCompletable(chatRoomChannel)