X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaServicesConfiguration.java;h=5367fdf0841c873feda731b860add63fcf2b4a16;hb=e7150b6822c45c520db73a96785dc0a8a81f503b;hp=55aa6f84802fae279932f9bcd0d15c82f27c99f0;hpb=2a47c081a50f8ebdda6db9955c3ddc69e5c601c2;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 55aa6f84..5367fdf0 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -9,15 +9,24 @@ import de.juplo.kafka.chat.backend.domain.SimpleChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.concurrent.ListenableFuture; import java.time.Clock; import java.time.ZoneId; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; @ConditionalOnProperty( @@ -25,8 +34,61 @@ import java.time.ZoneId; name = "services", havingValue = "kafka") @Configuration +@Slf4j public class KafkaServicesConfiguration implements ApplicationRunner { + @Autowired + ThreadPoolTaskExecutor taskExecutor; + @Autowired + ConfigurableApplicationContext context; + + @Autowired + ChatMessageChannel chatMessageChannel; + @Autowired + ChatRoomChannel chatRoomChannel; + + CompletableFuture chatRoomChannelConsumerJob; + CompletableFuture chatMessageChannelConsumerJob; + + + @Override + public void run(ApplicationArguments args) throws Exception + { + log.info("Starting the consumer for the ChatRoomChannel"); + chatRoomChannelConsumerJob = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + log.info("Starting the consumer for the ChatMessageChannel"); + chatMessageChannelConsumerJob = taskExecutor + .submitCompletable(chatMessageChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatMessageChannel exited abnormally!", e); + return null; + }); + } + + @PreDestroy + public void joinChatRoomChannelConsumerJob() + { + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + chatRoomChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } + + @PreDestroy + public void joinChatMessageChannelConsumerJob() + { + log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); + chatMessageChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatMessageChannel"); + } + + @Bean ChatHome kafkaChatHome( ShardingStrategy shardingStrategy,