refactor: Moved `ShardingStrategy` into package `persistence` -- ALIGNE
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / mongodb / MongoDbStorageStrategy.java
index 2300219..644ab88 100644 (file)
@@ -1,13 +1,13 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 
-import java.time.Clock;
 import java.util.UUID;
 
 
@@ -15,37 +15,55 @@ import java.util.UUID;
 @Slf4j
 public class MongoDbStorageStrategy implements StorageStrategy
 {
-  private final ChatRoomRepository repository;
-  private final Clock clock;
-  private final int bufferSize;
-  private final ChatRoomServiceFactory factory;
+  private final ChatRoomRepository chatRoomRepository;
+  private final MessageRepository messageRepository;
+  private final ShardingStrategy shardingStrategy;
 
 
   @Override
-  public void write(Flux<ChatRoom> chatroomFlux)
+  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
-    chatroomFlux
+    chatRoomInfoFlux
         .map(ChatRoomTo::from)
-        .subscribe(chatroomTo -> repository.save(chatroomTo));
+        .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
   }
 
   @Override
-  public Flux<ChatRoom> read()
+  public Flux<ChatRoomInfo> readChatRoomInfo()
   {
     return Flux
-        .fromIterable(repository.findAll())
+        .fromIterable(chatRoomRepository.findAll())
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
-          return new ChatRoom(
+          int shard = shardingStrategy.selectShard(chatRoomId);
+
+          log.info(
+              "{} - old shard: {}, new shard:  {}",
+              chatRoomId,
+              chatRoomTo.getShard(),
+              shard);
+
+          return new ChatRoomInfo(
               chatRoomId,
               chatRoomTo.getName(),
-              clock,
-              factory.create(
-                  Flux
-                      .fromIterable(chatRoomTo.getMessages())
-                      .map(messageTo -> messageTo.toMessage())),
-              bufferSize);
+              shard);
         });
   }
+
+  @Override
+  public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+  {
+    messageFlux
+        .map(message -> MessageTo.from(chatRoomId, message))
+        .subscribe(messageTo -> messageRepository.save(messageTo));
+  }
+
+  @Override
+  public Flux<Message> readChatRoomData(UUID chatRoomId)
+  {
+    return Flux
+        .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
+        .map(messageTo -> messageTo.toMessage());
+  }
 }