WIP
authorKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 18:35:51 +0000 (20:35 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 18:35:51 +0000 (20:35 +0200)
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java

index 33c522d..e91b28c 100644 (file)
@@ -19,4 +19,10 @@ public class ChatRoomInfo
   private final String name;
   @Getter
   private final Integer shard;
+
+
+  public ChatRoomInfo(UUID id, String name)
+  {
+    this(id, name, null);
+  }
 }
index 22c4668..5a1d186 100644 (file)
@@ -30,6 +30,7 @@ public class InfoChannel implements Runnable
   private final long[] startOffset;
   private final long[] currentOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+  private final DataChannel dataChannel;
 
   private boolean running;
 
@@ -37,7 +38,8 @@ public class InfoChannel implements Runnable
   public InfoChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> infoChannelConsumer)
+    Consumer<String, AbstractMessageTo> infoChannelConsumer,
+    DataChannel dataChannel)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
@@ -60,15 +62,16 @@ public class InfoChannel implements Runnable
         .entrySet()
         .stream()
         .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue());
+
+    this.dataChannel = dataChannel;
   }
 
 
-  Mono<Boolean> loadInProgress()
+  boolean loadInProgress()
   {
-    return Mono
-        .fromSupplier(() -> IntStream
-            .range(0, numShards)
-            .anyMatch(partition -> currentOffset[partition] < startOffset[partition]));
+    return IntStream
+        .range(0, numShards)
+        .anyMatch(partition -> currentOffset[partition] < startOffset[partition]);
   }
 
   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
@@ -176,6 +179,7 @@ public class InfoChannel implements Runnable
           chatRoomId);
 
       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
+      this.dataChannel.createChatRoom(chatRoomInfo);
     }
   }
 
index d0b8eff..9832519 100644 (file)
@@ -40,10 +40,7 @@ public class KafkaChatHomeService implements ChatHomeService
   {
     return infoChannel
         .getChatRoomInfo(id)
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
-            id,
-            shard,
-            dataChannel.getOwnedShards())));
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
 
   @Override
index 97cfa69..f6e39b3 100644 (file)
@@ -5,6 +5,7 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessage
 import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
@@ -33,21 +34,40 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Autowired
   ConfigurableApplicationContext context;
 
+  @Autowired
+  InfoChannel infoChannel;
   @Autowired
   DataChannel dataChannel;
   @Autowired
-  Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+  Consumer<String, AbstractMessageTo> infoChannelConsumer;
+  @Autowired
+  Consumer<String, AbstractMessageTo> dataChannelConsumer;
 
-  CompletableFuture<Void> chatRoomChannelConsumerJob;
+  CompletableFuture<Void> infoChannelConsumerJob;
+  CompletableFuture<Void> dataChannelConsumerJob;
 
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
+    String infoTopic = properties.getKafka().getInfoChannelTopic();
+    List< PartitionInfo> partitions =
+        infoChannelConsumer.partitionsFor(infoTopic);
+    infoChannelConsumer.assignment(partitions);
+    log.info("Starting the consumer for the InfoChannel");
+    infoChannelConsumerJob = taskExecutor
+        .submitCompletable(infoChannel)
+        .exceptionally(e ->
+        {
+          log.error("The consumer for the InfoChannel exited abnormally!", e);
+          return null;
+        });
+
+    while ()
     List<String> topics = List.of(properties.getKafka().getDataChannelTopic());
-    chatRoomChannelConsumer.subscribe(topics, dataChannel);
+    dataChannelConsumer.subscribe(topics, dataChannel);
     log.info("Starting the consumer for the ChatRoomChannel");
-    chatRoomChannelConsumerJob = taskExecutor
+    dataChannelConsumerJob = taskExecutor
         .submitCompletable(dataChannel)
         .exceptionally(e ->
         {
@@ -60,9 +80,9 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   public void joinChatRoomChannelConsumerJob()
   {
     log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
-    chatRoomChannelConsumer.wakeup();
+    infoChannelConsumer.wakeup();
     log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
-    chatRoomChannelConsumerJob.join();
+    dataChannelConsumerJob.join();
     log.info("Joined the consumer of the ChatRoomChannel");
   }
 }
index 0aeefea..caa9228 100644 (file)
@@ -3,7 +3,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
index c87036c..3eb9096 100644 (file)
@@ -36,18 +36,7 @@ public class MongoDbStorageStrategy implements StorageStrategy
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
-          int shard = shardingStrategy.selectShard(chatRoomId);
-
-          log.info(
-              "{} - old shard: {}, new shard:  {}",
-              chatRoomId,
-              chatRoomTo.getShard(),
-              shard);
-
-          return new ChatRoomInfo(
-              chatRoomId,
-              chatRoomTo.getName(),
-              shard);
+          return new ChatRoomInfo(chatRoomId, chatRoomTo.getName());
         });
   }