From f87d3e2fea3ee107d050bedc18d66471ae0fdd7e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 20 Apr 2023 10:47:31 +0200 Subject: [PATCH] NEU --- .../kafka/KafkaServicesApplicationRunner.java | 74 +++++++++++++++++++ .../kafka/KafkaServicesConfiguration.java | 63 +--------------- 2 files changed, 75 insertions(+), 62 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java new file mode 100644 index 00000000..aeec9b14 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -0,0 +1,74 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "kafka") +@Component +@Slf4j +public class KafkaServicesApplicationRunner implements ApplicationRunner +{ + @Autowired + ThreadPoolTaskExecutor taskExecutor; + @Autowired + ConfigurableApplicationContext context; + + @Autowired + ChatMessageChannel chatMessageChannel; + @Autowired + ChatRoomChannel chatRoomChannel; + + CompletableFuture chatRoomChannelConsumerJob; + CompletableFuture chatMessageChannelConsumerJob; + + + @Override + public void run(ApplicationArguments args) throws Exception + { + log.info("Starting the consumer for the ChatRoomChannel"); + chatRoomChannelConsumerJob = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + log.info("Starting the consumer for the ChatMessageChannel"); + chatMessageChannelConsumerJob = taskExecutor + .submitCompletable(chatMessageChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatMessageChannel exited abnormally!", e); + return null; + }); + } + + @PreDestroy + public void joinChatRoomChannelConsumerJob() + { + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + chatRoomChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } + + @PreDestroy + public void joinChatMessageChannelConsumerJob() + { + log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); + chatMessageChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatMessageChannel"); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index efc86b9e..b7eb711a 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -4,8 +4,6 @@ import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -16,21 +14,15 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.time.Clock; import java.time.ZoneId; import java.util.Properties; -import java.util.concurrent.CompletableFuture; @ConditionalOnProperty( @@ -38,61 +30,8 @@ import java.util.concurrent.CompletableFuture; name = "services", havingValue = "kafka") @Configuration -@Slf4j -public class KafkaServicesConfiguration implements ApplicationRunner +public class KafkaServicesConfiguration { - @Autowired - ThreadPoolTaskExecutor taskExecutor; - @Autowired - ConfigurableApplicationContext context; - - @Autowired - ChatMessageChannel chatMessageChannel; - @Autowired - ChatRoomChannel chatRoomChannel; - - CompletableFuture chatRoomChannelConsumerJob; - CompletableFuture chatMessageChannelConsumerJob; - - - @Override - public void run(ApplicationArguments args) throws Exception - { - log.info("Starting the consumer for the ChatRoomChannel"); - chatRoomChannelConsumerJob = taskExecutor - .submitCompletable(chatRoomChannel) - .exceptionally(e -> - { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); - return null; - }); - log.info("Starting the consumer for the ChatMessageChannel"); - chatMessageChannelConsumerJob = taskExecutor - .submitCompletable(chatMessageChannel) - .exceptionally(e -> - { - log.error("The consumer for the ChatMessageChannel exited abnormally!", e); - return null; - }); - } - - @PreDestroy - public void joinChatRoomChannelConsumerJob() - { - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - chatRoomChannelConsumerJob.join(); - log.info("Joined the consumer of the ChatRoomChannel"); - } - - @PreDestroy - public void joinChatMessageChannelConsumerJob() - { - log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); - chatMessageChannelConsumerJob.join(); - log.info("Joined the consumer of the ChatMessageChannel"); - } - - @Bean ChatHome kafkaChatHome( ShardingStrategy shardingStrategy, -- 2.20.1