From 1afc96421246a7fd2c697911da2abb4c4fde6341 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 19 Aug 2023 12:58:27 +0200 Subject: [PATCH] feat: Consumer subscribes in `KafkaServicesApplicationRunner` --- .../chat/backend/persistence/kafka/ChatRoomChannel.java | 2 -- .../persistence/kafka/KafkaServicesApplicationRunner.java | 7 +++++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index c8bc41b0..0b35f8ce 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -199,8 +199,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener @Override public void run() { - consumer.subscribe(List.of(topic), this); - running = true; while (running) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index 5c7e0d8c..8474239b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; +import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; @@ -12,6 +13,7 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import java.util.List; import java.util.concurrent.CompletableFuture; @@ -23,6 +25,9 @@ import java.util.concurrent.CompletableFuture; @Slf4j public class KafkaServicesApplicationRunner implements ApplicationRunner { + @Autowired + ChatBackendProperties properties; + @Autowired ThreadPoolTaskExecutor taskExecutor; @Autowired @@ -39,6 +44,8 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Override public void run(ApplicationArguments args) throws Exception { + List topics = List.of(properties.getKafka().getChatRoomChannelTopic()); + chatRoomChannelConsumer.subscribe(topics, chatRoomChannel); log.info("Starting the consumer for the ChatRoomChannel"); chatRoomChannelConsumerJob = taskExecutor .submitCompletable(chatRoomChannel) -- 2.20.1