WIP
authorKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 20:53:23 +0000 (22:53 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 20:53:23 +0000 (22:53 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java

index 125227a..d9754a2 100644 (file)
@@ -1,18 +1,15 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import jakarta.annotation.PreDestroy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -26,9 +23,12 @@ import java.util.concurrent.CompletableFuture;
 @Slf4j
 public class KafkaServicesApplicationRunner implements ApplicationRunner
 {
+  private final String infoTopic;
   private final ThreadPoolTaskExecutor taskExecutor;
+  private final InfoChannel infoChannel;
   private final DataChannel dataChannel;
-  private final Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+  private final Consumer<String, AbstractMessageTo> infoChannelConsumer;
+  private final Consumer<String, AbstractMessageTo> dataChannelConsumer;
   private final WorkAssignor workAssignor;
 
   CompletableFuture<Void> infoChannelConsumerJob;
@@ -38,7 +38,6 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
-    String infoTopic = properties.getKafka().getInfoChannelTopic();
     List<TopicPartition> partitions = infoChannelConsumer
         .partitionsFor(infoTopic)
         .stream()
@@ -62,7 +61,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
       Thread.sleep(1000);
     }
 
-    workAssignor.assignWork(chatRoomChannelConsumer);
+    workAssignor.assignWork(dataChannelConsumer);
     log.info("Starting the consumer for the DataChannel");
     dataChannelConsumerJob = taskExecutor
         .submitCompletable(dataChannel)
index 18611d4..75d011f 100644 (file)
@@ -38,15 +38,21 @@ public class KafkaServicesConfiguration
 {
   @Bean
   KafkaServicesApplicationRunner kafkaServicesApplicationRunner(
+      ChatBackendProperties properties,
       ThreadPoolTaskExecutor taskExecutor,
-      ChatRoomChannel chatRoomChannel,
-      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+      InfoChannel infoChannel,
+      DataChannel dataChannel,
+      Consumer<String, AbstractMessageTo> infoChannelConsumer,
+      Consumer<String, AbstractMessageTo> dataChannelConsumer,
       KafkaServicesApplicationRunner.WorkAssignor workAssignor)
   {
     return new KafkaServicesApplicationRunner(
+        properties.getKafka().getInfoChannelTopic(),
         taskExecutor,
-        chatRoomChannel,
-        chatRoomChannelConsumer,
+        infoChannel,
+        dataChannel,
+        infoChannelConsumer,
+        dataChannelConsumer,
         workAssignor);
   }
 
index a4fc063..6bf3a74 100644 (file)
@@ -8,7 +8,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.DefaultApplicationArguments;
 import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
 import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -52,12 +51,6 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   final static String INFO_TOPIC = "KAFKA_CHAT_HOME_TEST_INFO";
   final static String DATA_TOPIC = "KAFKA_CHAT_HOME_TEST_DATA";
 
-<<<<<<< HEAD
-=======
-  static CompletableFuture<Void> INFO_CHANNEL_CONSUMER_JOB;
-  static CompletableFuture<Void> DATA_CHANNEL_CONSUMER_JOB;
-
->>>>>>> 5c8db7f (WIP)
 
   @TestConfiguration
   @EnableConfigurationProperties(ChatBackendProperties.class)
@@ -65,15 +58,14 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   static class KafkaChatHomeTestConfiguration
   {
     @Bean
-    KafkaServicesApplicationRunner.WorkAssignor workAssignor(
-        ChatRoomChannel chatRoomChannel)
+    KafkaServicesApplicationRunner.WorkAssignor workAssignor(DataChannel dataChannel)
     {
       return consumer ->
       {
         List<TopicPartition> assignedPartitions =
-            List.of(new TopicPartition(TOPIC, 2));
+            List.of(new TopicPartition(DATA_TOPIC, 2));
         consumer.assign(assignedPartitions);
-        chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+        dataChannel.onPartitionsAssigned(assignedPartitions);
       };
     }
 
@@ -110,14 +102,6 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   @AfterAll
   static void joinConsumerJob(@Autowired KafkaServicesApplicationRunner applicationRunner)
   {
-<<<<<<< HEAD
     applicationRunner.joinChatRoomChannelConsumerJob();
-=======
-    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
-    chatRoomChannelConsumer.wakeup();
-    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
-    DATA_CHANNEL_CONSUMER_JOB.join();
-    log.info("Joined the consumer of the ChatRoomChannel");
->>>>>>> 5c8db7f (WIP)
   }
 }