WIP:refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code
authorKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 21:13:25 +0000 (23:13 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 21:13:25 +0000 (23:13 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java
new file mode 100644 (file)
index 0000000..d80d5fe
--- /dev/null
@@ -0,0 +1,8 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+
+public interface MessageRepository extends MongoRepository<MessageTo, String>
+{
+}
index 772c6e4..41b1d20 100644 (file)
@@ -17,9 +17,11 @@ import java.util.UUID;
 @Slf4j
 public class MongoDbStorageStrategy implements StorageStrategy
 {
-  private final ChatRoomRepository repository;
+  private final ChatRoomRepository chatRoomRepository;
+  private final MessageRepository messageRepository;
   private final Clock clock;
   private final int bufferSize;
+  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
 
 
@@ -28,24 +30,29 @@ public class MongoDbStorageStrategy implements StorageStrategy
   {
     chatRoomInfoFlux
         .map(ChatRoomTo::from)
-        .subscribe(chatroomTo -> repository.save(chatroomTo));
+        .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
   }
 
   @Override
   public Flux<ChatRoomInfo> readChatRoomInfo()
   {
     return Flux
-        .fromIterable(repository.findAll())
+        .fromIterable(chatRoomRepository.findAll())
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
-          return new ChatRoomData(
-              clock,
-              factory.create(
-                  Flux
-                      .fromIterable(chatRoomTo.getMessages())
-                      .map(messageTo -> messageTo.toMessage())),
-              bufferSize);
+          int shard = shardingStrategy.selectShard(chatRoomId);
+
+          log.info(
+              "{} - old shard: {}, new shard:  {}",
+              chatRoomId,
+              chatRoomTo.getShard(),
+              shard);
+
+          return new ChatRoomInfo(
+              chatRoomId,
+              chatRoomTo.getName(),
+              shard);
         });
   }
 
@@ -53,15 +60,15 @@ public class MongoDbStorageStrategy implements StorageStrategy
   public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
   {
     chatRoomDataFlux
-        .map(ChatRoomTo::from)
-        .subscribe(chatroomTo -> repository.save(chatroomTo));
+        .flatMap(ChatRoomTo::from)
+        .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
   }
 
   @Override
   public Flux<ChatRoomData> readChatRoomData()
   {
     return Flux
-        .fromIterable(repository.findAll())
+        .fromIterable(chatRoomRepository.findAll())
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());