import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import jakarta.annotation.PreDestroy;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
     prefix = "chat.backend",
     name = "services",
     havingValue = "kafka")
-@Component
+@RequiredArgsConstructor
 @Slf4j
 public class KafkaServicesApplicationRunner implements ApplicationRunner
 {
-  @Autowired
-  ChatBackendProperties properties;
-
-  @Autowired
-  ThreadPoolTaskExecutor taskExecutor;
-  @Autowired
-  ConfigurableApplicationContext context;
-
-  @Autowired
-  ChatRoomChannel chatRoomChannel;
-  @Autowired
-  Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+  private final ThreadPoolTaskExecutor taskExecutor;
+  private final ChatRoomChannel chatRoomChannel;
+  private final Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+  private final WorkAssignor workAssignor;
 
   CompletableFuture<Void> chatRoomChannelConsumerJob;
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
-    List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
-    chatRoomChannelConsumer.subscribe(topics, chatRoomChannel);
+    workAssignor.assignWork(chatRoomChannelConsumer);
     log.info("Starting the consumer for the ChatRoomChannel");
     chatRoomChannelConsumerJob = taskExecutor
         .submitCompletable(chatRoomChannel)
     chatRoomChannelConsumerJob.join();
     log.info("Joined the consumer of the ChatRoomChannel");
   }
+
+
+  interface WorkAssignor
+  {
+    void assignWork(Consumer<?, ?> consumer);
+  }
 }
 
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.core.task.TaskExecutor;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.time.Clock;
 import java.time.ZoneId;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
 @Configuration
 public class KafkaServicesConfiguration
 {
+  @Bean
+  KafkaServicesApplicationRunner kafkaServicesApplicationRunner(
+      ChatBackendProperties properties,
+      ThreadPoolTaskExecutor taskExecutor,
+      ChatRoomChannel chatRoomChannel,
+      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer)
+  {
+    return new KafkaServicesApplicationRunner(
+        taskExecutor,
+        chatRoomChannel,
+        chatRoomChannelConsumer,
+        consumer ->
+        {
+          List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
+          consumer.subscribe(topics, chatRoomChannel);
+        });
+  }
+
   @Bean
   ChatHomeService kafkaChatHome(
       ChatBackendProperties properties,