WIP:refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code
authorKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 20:52:19 +0000 (22:52 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 20:52:19 +0000 (22:52 +0200)
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java

index cf44802..0031bb0 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.domain;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -8,6 +9,7 @@ import reactor.core.publisher.SynchronousSink;
 
 import java.time.Clock;
 import java.time.LocalDateTime;
+import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -17,6 +19,8 @@ public class ChatRoomData
 {
   public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
 
+  @Getter
+  private final UUID id;
   private final ChatRoomService service;
   private final Clock clock;
   private final int bufferSize;
@@ -24,11 +28,13 @@ public class ChatRoomData
 
 
   public ChatRoomData(
+      UUID id,
       ChatRoomService service,
       Clock clock,
       int bufferSize)
   {
-    log.info("Created ChatRoom with buffer-size {}", bufferSize);
+    log.info("Created ChatRoom {id} with buffer-size {}", id, bufferSize);
+    this.id = id;
     this.service = service;
     this.clock = clock;
     this.bufferSize = bufferSize;
index 860d9ff..bcad3f3 100644 (file)
@@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
 import de.juplo.kafka.chat.backend.domain.ChatRoomData;
-import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import lombok.RequiredArgsConstructor;
@@ -33,13 +33,12 @@ public class FilesStorageStrategy implements StorageStrategy
   private final Path storagePath;
   private final Clock clock;
   private final int bufferSize;
-  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
   private final ObjectMapper mapper;
 
 
   @Override
-  public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
+  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
     Path path = chatroomsPath();
     log.info("Writing chatrooms to {}", path);
@@ -52,7 +51,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      chatRoomDataFlux
+      chatRoomInfoFlux
           .log()
           .doFirst(() ->
           {
@@ -84,7 +83,6 @@ public class FilesStorageStrategy implements StorageStrategy
             {
               ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom);
               generator.writeObject(infoTo);
-              writeMessages(infoTo, chatroom.getMessages());
             }
             catch (IOException e)
             {
@@ -98,6 +96,69 @@ public class FilesStorageStrategy implements StorageStrategy
     }
   }
 
+  @Override
+  public Flux<ChatRoomInfo> readChatRoomInfo()
+  {
+    JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
+    return Flux
+        .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
+        .log()
+        .map(infoTo -> new ChatRoomInfo(
+            infoTo.getId(),
+            infoTo.getName(),
+            infoTo.getShard()));
+  }
+
+  @Override
+  public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
+  {
+    Path path = chatroomsPath();
+    log.info("Writing chatrooms to {}", path);
+    try
+    {
+      Files.createDirectories(storagePath);
+
+      JsonGenerator generator =
+          mapper
+              .getFactory()
+              .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+      chatRoomDataFlux
+          .log()
+          .doFirst(() ->
+          {
+            try
+            {
+              generator.useDefaultPrettyPrinter();
+              generator.writeStartArray();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .doOnTerminate(() ->
+          {
+            try
+            {
+              generator.writeEndArray();
+              generator.close();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .subscribe(chatRoomData -> writeMessages(
+              chatRoomData.getId(),
+              chatRoomData.getMessages()));
+    }
+    catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override
   public Flux<ChatRoomData> readChatRoomData()
   {
@@ -108,18 +169,18 @@ public class FilesStorageStrategy implements StorageStrategy
         .map(infoTo ->
         {
           UUID chatRoomId = infoTo.getId();
-          int shard = shardingStrategy.selectShard(chatRoomId);
           return new ChatRoomData(
+              chatRoomId,
+              factory.create(readMessages(chatRoomId)),
               clock,
-              factory.create(readMessages(infoTo)),
               bufferSize);
         });
   }
 
-  public void writeMessages(ChatRoomInfoTo infoTo, Flux<Message> messageFlux)
+  public void writeMessages(UUID id, Flux<Message> messageFlux)
   {
-    Path path = chatroomPath(infoTo);
-    log.info("Writing messages for {} to {}", infoTo, path);
+    Path path = chatroomPath(id);
+    log.info("Writing messages for {} to {}", id, path);
     try
     {
       Files.createDirectories(storagePath);
@@ -174,11 +235,11 @@ public class FilesStorageStrategy implements StorageStrategy
     }
   }
 
-  public Flux<Message> readMessages(ChatRoomInfoTo infoTo)
+  public Flux<Message> readMessages(UUID id)
   {
     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
     return Flux
-        .from(new JsonFilePublisher<MessageTo>(chatroomPath(infoTo), mapper, type))
+        .from(new JsonFilePublisher<MessageTo>(chatroomPath(id), mapper, type))
         .log()
         .map(MessageTo::toMessage);
   }
@@ -188,8 +249,8 @@ public class FilesStorageStrategy implements StorageStrategy
     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
   }
 
-  Path chatroomPath(ChatRoomInfoTo infoTo)
+  Path chatroomPath(UUID id)
   {
-    return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json"));
+    return storagePath.resolve(Path.of(id.toString() + ".json"));
   }
 }
index f3fae32..58f5bce 100644 (file)
@@ -6,34 +6,26 @@ import lombok.*;
 import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
-import java.util.List;
-
 
 @AllArgsConstructor
 @NoArgsConstructor
 @Getter(AccessLevel.PACKAGE)
 @Setter(AccessLevel.PACKAGE)
 @EqualsAndHashCode(of = { "id" })
-@ToString(of = { "id", "name" })
+@ToString(of = { "id", "shard", "name" })
 @Document
 public class ChatRoomTo
 {
   @Id
   private String id;
+  private Integer shard;
   private String name;
-  private List<MessageTo> messages;
 
-  public static ChatRoomTo from(
-      ChatRoomInfo chatRoomInfo,
-      ChatRoomData chatRoomData)
+  public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
   {
     return new ChatRoomTo(
         chatRoomInfo.getId().toString(),
-        chatRoomInfo.getName(),
-        chatRoomData
-            .getMessages()
-            .map(MessageTo::from)
-            .collectList()
-            .block());
+        chatRoomInfo.getShard(),
+        chatRoomInfo.getName());
   }
 }
index 7952a2b..772c6e4 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
@@ -19,10 +20,35 @@ public class MongoDbStorageStrategy implements StorageStrategy
   private final ChatRoomRepository repository;
   private final Clock clock;
   private final int bufferSize;
-  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
 
 
+  @Override
+  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  {
+    chatRoomInfoFlux
+        .map(ChatRoomTo::from)
+        .subscribe(chatroomTo -> repository.save(chatroomTo));
+  }
+
+  @Override
+  public Flux<ChatRoomInfo> readChatRoomInfo()
+  {
+    return Flux
+        .fromIterable(repository.findAll())
+        .map(chatRoomTo ->
+        {
+          UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+          return new ChatRoomData(
+              clock,
+              factory.create(
+                  Flux
+                      .fromIterable(chatRoomTo.getMessages())
+                      .map(messageTo -> messageTo.toMessage())),
+              bufferSize);
+        });
+  }
+
   @Override
   public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
   {