package de.juplo.kafka.chat.backend.persistence.kafka;
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
@Slf4j
public class KafkaServicesApplicationRunner implements ApplicationRunner
{
+ @Autowired
+ ChatBackendProperties properties;
+
@Autowired
ThreadPoolTaskExecutor taskExecutor;
@Autowired
@Override
public void run(ApplicationArguments args) throws Exception
{
+ List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
+ chatRoomChannelConsumer.subscribe(topics, chatRoomChannel);
log.info("Starting the consumer for the ChatRoomChannel");
chatRoomChannelConsumerJob = taskExecutor
.submitCompletable(chatRoomChannel)