NEU
authorKai Moritz <kai@juplo.de>
Thu, 20 Apr 2023 08:47:31 +0000 (10:47 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 20 Apr 2023 08:47:31 +0000 (10:47 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java
new file mode 100644 (file)
index 0000000..aeec9b1
--- /dev/null
@@ -0,0 +1,74 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.CompletableFuture;
+
+
+@ConditionalOnProperty(
+    prefix = "chat.backend",
+    name = "services",
+    havingValue = "kafka")
+@Component
+@Slf4j
+public class KafkaServicesApplicationRunner implements ApplicationRunner
+{
+  @Autowired
+  ThreadPoolTaskExecutor taskExecutor;
+  @Autowired
+  ConfigurableApplicationContext context;
+
+  @Autowired
+  ChatMessageChannel chatMessageChannel;
+  @Autowired
+  ChatRoomChannel chatRoomChannel;
+
+  CompletableFuture<Void> chatRoomChannelConsumerJob;
+  CompletableFuture<Void> chatMessageChannelConsumerJob;
+
+
+  @Override
+  public void run(ApplicationArguments args) throws Exception
+  {
+    log.info("Starting the consumer for the ChatRoomChannel");
+    chatRoomChannelConsumerJob = taskExecutor
+        .submitCompletable(chatRoomChannel)
+        .exceptionally(e ->
+        {
+          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+          return null;
+        });
+    log.info("Starting the consumer for the ChatMessageChannel");
+    chatMessageChannelConsumerJob = taskExecutor
+        .submitCompletable(chatMessageChannel)
+        .exceptionally(e ->
+        {
+          log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
+          return null;
+        });
+  }
+
+  @PreDestroy
+  public void joinChatRoomChannelConsumerJob()
+  {
+    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+    chatRoomChannelConsumerJob.join();
+    log.info("Joined the consumer of the ChatRoomChannel");
+  }
+
+  @PreDestroy
+  public void joinChatMessageChannelConsumerJob()
+  {
+    log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
+    chatMessageChannelConsumerJob.join();
+    log.info("Joined the consumer of the ChatMessageChannel");
+  }
+}
index efc86b9..b7eb711 100644 (file)
@@ -4,8 +4,6 @@ import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import jakarta.annotation.PreDestroy;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -16,21 +14,15 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.time.Clock;
 import java.time.ZoneId;
 import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
 
 
 @ConditionalOnProperty(
@@ -38,61 +30,8 @@ import java.util.concurrent.CompletableFuture;
     name = "services",
     havingValue = "kafka")
 @Configuration
-@Slf4j
-public class KafkaServicesConfiguration implements ApplicationRunner
+public class KafkaServicesConfiguration
 {
-  @Autowired
-  ThreadPoolTaskExecutor taskExecutor;
-  @Autowired
-  ConfigurableApplicationContext context;
-
-  @Autowired
-  ChatMessageChannel chatMessageChannel;
-  @Autowired
-  ChatRoomChannel chatRoomChannel;
-
-  CompletableFuture<Void> chatRoomChannelConsumerJob;
-  CompletableFuture<Void> chatMessageChannelConsumerJob;
-
-
-  @Override
-  public void run(ApplicationArguments args) throws Exception
-  {
-    log.info("Starting the consumer for the ChatRoomChannel");
-    chatRoomChannelConsumerJob = taskExecutor
-        .submitCompletable(chatRoomChannel)
-        .exceptionally(e ->
-        {
-          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
-          return null;
-        });
-    log.info("Starting the consumer for the ChatMessageChannel");
-    chatMessageChannelConsumerJob = taskExecutor
-        .submitCompletable(chatMessageChannel)
-        .exceptionally(e ->
-        {
-          log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
-          return null;
-        });
-  }
-
-  @PreDestroy
-  public void joinChatRoomChannelConsumerJob()
-  {
-    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
-    chatRoomChannelConsumerJob.join();
-    log.info("Joined the consumer of the ChatRoomChannel");
-  }
-
-  @PreDestroy
-  public void joinChatMessageChannelConsumerJob()
-  {
-    log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
-    chatMessageChannelConsumerJob.join();
-    log.info("Joined the consumer of the ChatMessageChannel");
-  }
-
-
   @Bean
   ChatHome kafkaChatHome(
       ShardingStrategy shardingStrategy,