refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code
authorKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 23:41:45 +0000 (01:41 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 23:41:45 +0000 (01:41 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java

index 9997b94..c4ab80a 100644 (file)
@@ -1,14 +1,16 @@
 package de.juplo.kafka.chat.backend.persistence;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoomData;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
 import reactor.core.publisher.Flux;
 
+import java.util.UUID;
+
 
 public interface StorageStrategy
 {
   void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
   Flux<ChatRoomInfo> readChatRoomInfo();
-  void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux);
-  Flux<ChatRoomData> readChatRoomData();
+  void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+  Flux<Message> readChatRoomData(UUID chatRoomId);
 }
index dd8f7d2..76080c9 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.chat.backend.persistence.inmemory;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -28,10 +29,12 @@ public class InMemoryServicesConfiguration
   ChatHome noneShardingChatHome(
       ChatBackendProperties properties,
       StorageStrategy storageStrategy,
+      ChatRoomServiceFactory chatRoomServiceFactory,
       Clock clock)
   {
     return new SimpleChatHome(
-        storageStrategy.readChatRoomData(),
+        storageStrategy,
+        chatRoomServiceFactory,
         clock,
         properties.getChatroomBufferSize());
   }
index f0067fa..961cea2 100644 (file)
@@ -1,6 +1,8 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -21,23 +23,23 @@ public class SimpleChatHome implements ChatHome
 
 
   public SimpleChatHome(
-      Flux<ChatRoomInfo> chatRoomInfoFlux,
-      ChatRoomService chatRoomService,
+      StorageStrategy storageStrategy,
+      ChatRoomServiceFactory chatRoomServiceFactory,
       Clock clock,
       int bufferSize)
   {
     this(
         null,
-        chatRoomInfoFlux,
-        chatRoomService,
+        storageStrategy,
+        chatRoomServiceFactory,
         clock,
         bufferSize);
   }
 
   public SimpleChatHome(
       Integer shard,
-      Flux<ChatRoomInfo> chatRoomInfoFlux,
-      ChatRoomService chatRoomService,
+      StorageStrategy storageStrategy,
+      ChatRoomServiceFactory chatRoomServiceFactory,
       Clock clock,
       int bufferSize)
   {
@@ -46,7 +48,8 @@ public class SimpleChatHome implements ChatHome
     this.shard = shard;
     this.chatRoomInfo = new HashMap<>();
     this.chatRoomData = new HashMap<>();
-    chatRoomInfoFlux
+    storageStrategy
+        .readChatRoomInfo()
         .filter(info ->
         {
           if (shard == null || info.getShard() == shard)
@@ -65,10 +68,17 @@ public class SimpleChatHome implements ChatHome
         .toStream()
         .forEach(info ->
         {
-          chatRoomInfo.put(info.getId(), info);
+          UUID chatRoomId = info.getId();
+          chatRoomInfo.put(chatRoomId, info);
+          Flux<Message> messageFlux =
+              storageStrategy.readChatRoomData(chatRoomId);
           chatRoomData.put(
               info.getId(),
-              new ChatRoomData(chatRoomService, clock, bufferSize));
+              new ChatRoomData(
+                  chatRoomId,
+                  chatRoomServiceFactory.create(messageFlux),
+                  clock,
+                  bufferSize));
         });
     this.clock = clock;
     this.bufferSize = bufferSize;
@@ -81,7 +91,7 @@ public class SimpleChatHome implements ChatHome
     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
     ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
-    ChatRoomData chatRoomData = new ChatRoomData(service, clock, bufferSize);
+    ChatRoomData chatRoomData = new ChatRoomData(id, service, clock, bufferSize);
     this.chatRoomData.put(id, chatRoomData);
     return Mono.just(chatRoomInfo);
   }
index ca0f851..b0bb7d8 100644 (file)
@@ -79,12 +79,12 @@ public class FilesStorageStrategy implements StorageStrategy
               throw new RuntimeException(e);
             }
           })
-          .subscribe(chatroom ->
+          .subscribe(chatRoomInfo ->
           {
             try
             {
-              ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom);
-              generator.writeObject(infoTo);
+              ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
+              generator.writeObject(chatRoomInfoTo);
             }
             catch (IOException e)
             {
@@ -124,77 +124,12 @@ public class FilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
+  public void writeChatRoomData(
+      UUID chatRoomId,
+      Flux<Message> messageFlux)
   {
-    Path path = chatroomsPath();
-    log.info("Writing chatrooms to {}", path);
-    try
-    {
-      Files.createDirectories(storagePath);
-
-      JsonGenerator generator =
-          mapper
-              .getFactory()
-              .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
-
-      chatRoomDataFlux
-          .log()
-          .doFirst(() ->
-          {
-            try
-            {
-              generator.useDefaultPrettyPrinter();
-              generator.writeStartArray();
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          })
-          .doOnTerminate(() ->
-          {
-            try
-            {
-              generator.writeEndArray();
-              generator.close();
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          })
-          .subscribe(chatRoomData -> writeMessages(
-              chatRoomData.getId(),
-              chatRoomData.getMessages()));
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Flux<ChatRoomData> readChatRoomData()
-  {
-    JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
-    return Flux
-        .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
-        .log()
-        .map(infoTo ->
-        {
-          UUID chatRoomId = infoTo.getId();
-          return new ChatRoomData(
-              chatRoomId,
-              factory.create(readMessages(chatRoomId)),
-              clock,
-              bufferSize);
-        });
-  }
-
-  public void writeMessages(UUID id, Flux<Message> messageFlux)
-  {
-    Path path = chatroomPath(id);
-    log.info("Writing messages for {} to {}", id, path);
+    Path path = chatroomPath(chatRoomId);
+    log.info("Writing messages for {} to {}", chatRoomId, path);
     try
     {
       Files.createDirectories(storagePath);
@@ -249,11 +184,12 @@ public class FilesStorageStrategy implements StorageStrategy
     }
   }
 
-  public Flux<Message> readMessages(UUID id)
+  @Override
+  public Flux<Message> readChatRoomData(UUID chatRoomId)
   {
     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
     return Flux
-        .from(new JsonFilePublisher<MessageTo>(chatroomPath(id), mapper, type))
+        .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
         .log()
         .map(MessageTo::toMessage);
   }
index d80d5fe..c2a7f0d 100644 (file)
@@ -2,7 +2,10 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
 import org.springframework.data.mongodb.repository.MongoRepository;
 
+import java.util.List;
+
 
 public interface MessageRepository extends MongoRepository<MessageTo, String>
 {
+  List<MessageTo> findByChatRoomId(String chatRoomId);
 }
index 9ef38a7..c2b014e 100644 (file)
@@ -4,6 +4,7 @@ import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
+import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -21,15 +22,18 @@ public class MongoDbStorageConfiguration
   @Bean
   public StorageStrategy storageStrategy(
       ChatRoomRepository chatRoomRepository,
+      MessageRepository messageRepository,
       ChatBackendProperties properties,
       Clock clock,
-      ShardingStrategy shardingStrategy)
+      ShardingStrategy shardingStrategy,
+      ChatRoomServiceFactory chatRoomServiceFactory)
   {
     return new MongoDbStorageStrategy(
         chatRoomRepository,
+        messageRepository,
         clock,
         properties.getChatroomBufferSize(),
         shardingStrategy,
-        messageFlux -> new InMemoryChatRoomService(messageFlux));
+        chatRoomServiceFactory);
   }
 }
index 41b1d20..8a172c1 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoomData;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
@@ -57,29 +58,18 @@ public class MongoDbStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
+  public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
   {
-    chatRoomDataFlux
-        .flatMap(ChatRoomTo::from)
-        .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
+    messageFlux
+        .map(message -> MessageTo.from(message))
+        .subscribe(messageTo -> messageRepository.save(messageTo)); // TODO: Namespace <chatRoomId>
   }
 
   @Override
-  public Flux<ChatRoomData> readChatRoomData()
+  public Flux<Message> readChatRoomData(UUID chatRoomId)
   {
     return Flux
-        .fromIterable(chatRoomRepository.findAll())
-        .map(chatRoomTo ->
-        {
-          UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
-          int shard = shardingStrategy.selectShard(chatRoomId);
-          return new ChatRoomData(
-              clock,
-              factory.create(
-                  Flux
-                      .fromIterable(chatRoomTo.getMessages())
-                      .map(messageTo -> messageTo.toMessage())),
-              bufferSize);
-        });
+        .fromIterable(messageRepository.findByChatRoomId(chatRoomId.toString()))
+        .map(messageTo -> messageTo.toMessage());
   }
 }
index 84ac8de..ab24bb8 100644 (file)
@@ -1,7 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.storage.nostorage;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoomData;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -11,6 +11,8 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import reactor.core.publisher.Flux;
 
+import java.util.UUID;
+
 
 @ConditionalOnProperty(
     prefix = "chat.backend.inmemory",
@@ -39,10 +41,10 @@ public class NoStorageStorageConfiguration
       }
 
       @Override
-      public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux) {}
+      public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
 
       @Override
-      public Flux<ChatRoomData> readChatRoomData()
+      public Flux<Message> readChatRoomData(UUID chatRoomId)
       {
         return Flux.empty();
       }