WIP
authorKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 19:20:39 +0000 (21:20 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 19:20:39 +0000 (21:20 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java

index 70e41e1..a863353 100644 (file)
@@ -3,13 +3,13 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import jakarta.annotation.PreDestroy;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
@@ -21,22 +21,14 @@ import java.util.concurrent.CompletableFuture;
     prefix = "chat.backend",
     name = "services",
     havingValue = "kafka")
-@Component
+@RequiredArgsConstructor
 @Slf4j
 public class KafkaServicesApplicationRunner implements ApplicationRunner
 {
-  @Autowired
-  ChatBackendProperties properties;
-
-  @Autowired
-  ThreadPoolTaskExecutor taskExecutor;
-  @Autowired
-  ConfigurableApplicationContext context;
-
-  @Autowired
-  ChatRoomChannel chatRoomChannel;
-  @Autowired
-  Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+  private final ThreadPoolTaskExecutor taskExecutor;
+  private final ChatRoomChannel chatRoomChannel;
+  private final Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+  private final WorkAssignor workAssignor;
 
   CompletableFuture<Void> chatRoomChannelConsumerJob;
 
@@ -44,8 +36,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
-    List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
-    chatRoomChannelConsumer.subscribe(topics, chatRoomChannel);
+    workAssignor.assignWork(chatRoomChannelConsumer);
     log.info("Starting the consumer for the ChatRoomChannel");
     chatRoomChannelConsumerJob = taskExecutor
         .submitCompletable(chatRoomChannel)
@@ -65,4 +56,10 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
     chatRoomChannelConsumerJob.join();
     log.info("Joined the consumer of the ChatRoomChannel");
   }
+
+
+  interface WorkAssignor
+  {
+    void assignWork(Consumer<?, ?> consumer);
+  }
 }
index d63111a..373fcb4 100644 (file)
@@ -16,12 +16,15 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.core.task.TaskExecutor;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.time.Clock;
 import java.time.ZoneId;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -33,6 +36,24 @@ import java.util.Properties;
 @Configuration
 public class KafkaServicesConfiguration
 {
+  @Bean
+  KafkaServicesApplicationRunner kafkaServicesApplicationRunner(
+      ChatBackendProperties properties,
+      ThreadPoolTaskExecutor taskExecutor,
+      ChatRoomChannel chatRoomChannel,
+      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer)
+  {
+    return new KafkaServicesApplicationRunner(
+        taskExecutor,
+        chatRoomChannel,
+        chatRoomChannelConsumer,
+        consumer ->
+        {
+          List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
+          consumer.subscribe(topics, chatRoomChannel);
+        });
+  }
+
   @Bean
   ChatHomeService kafkaChatHome(
       ChatBackendProperties properties,
index c120e46..20a0936 100644 (file)
@@ -49,8 +49,6 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 {
   final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
 
-  static CompletableFuture<Void> CONSUMER_JOB;
-
 
   @TestConfiguration
   @EnableConfigurationProperties(ChatBackendProperties.class)