refactor: Moved `ShardingStrategy` into package `persistence` -- ALIGNE
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / files / FilesStorageStrategy.java
index e670970..9c79197 100644 (file)
@@ -3,11 +3,12 @@ package de.juplo.kafka.chat.backend.persistence.storage.files;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.api.ChatRoomTo;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
@@ -15,7 +16,7 @@ import reactor.core.publisher.Flux;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.time.Clock;
+import java.util.UUID;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
@@ -29,14 +30,12 @@ public class FilesStorageStrategy implements StorageStrategy
 
 
   private final Path storagePath;
-  private final Clock clock;
-  private final int bufferSize;
-  private final ChatRoomServiceFactory factory;
+  private final ShardingStrategy shardingStrategy;
   private final ObjectMapper mapper;
 
 
   @Override
-  public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
+  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
     Path path = chatroomsPath();
     log.info("Writing chatrooms to {}", path);
@@ -49,7 +48,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      chatroomFlux
+      chatRoomInfoFlux
           .log()
           .doFirst(() ->
           {
@@ -75,13 +74,12 @@ public class FilesStorageStrategy implements StorageStrategy
               throw new RuntimeException(e);
             }
           })
-          .subscribe(chatroom ->
+          .subscribe(chatRoomInfo ->
           {
             try
             {
-              ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
-              generator.writeObject(chatroomTo);
-              writeMessages(chatroomTo, chatroom.getMessages());
+              ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
+              generator.writeObject(chatRoomInfoTo);
             }
             catch (IOException e)
             {
@@ -96,25 +94,37 @@ public class FilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public Flux<ChatRoom> readChatrooms()
+  public Flux<ChatRoomInfo> readChatRoomInfo()
   {
-    JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
+    JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
     return Flux
-        .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
+        .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
         .log()
-        .map(chatRoomTo -> new ChatRoom(
-            chatRoomTo.getId(),
-            chatRoomTo.getName(),
-            clock,
-            factory.create(readMessages(chatRoomTo)),
-            bufferSize));
+        .map(chatRoomInfoTo ->
+        {
+          UUID chatRoomId = chatRoomInfoTo.getId();
+          int shard = shardingStrategy.selectShard(chatRoomId);
+
+          log.info(
+              "{} - old shard: {}, new shard:  {}",
+              chatRoomId,
+              chatRoomInfoTo.getShard(),
+              shard);
+
+          return new ChatRoomInfo(
+              chatRoomId,
+              chatRoomInfoTo.getName(),
+              shard);
+        });
   }
 
   @Override
-  public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
+  public void writeChatRoomData(
+      UUID chatRoomId,
+      Flux<Message> messageFlux)
   {
-    Path path = chatroomPath(chatroomTo);
-    log.info("Writing messages for {} to {}", chatroomTo, path);
+    Path path = chatroomPath(chatRoomId);
+    log.info("Writing messages for {} to {}", chatRoomId, path);
     try
     {
       Files.createDirectories(storagePath);
@@ -170,11 +180,11 @@ public class FilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public Flux<Message> readMessages(ChatRoomTo chatroomTo)
+  public Flux<Message> readChatRoomData(UUID chatRoomId)
   {
     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
     return Flux
-        .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+        .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
         .log()
         .map(MessageTo::toMessage);
   }
@@ -184,8 +194,8 @@ public class FilesStorageStrategy implements StorageStrategy
     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
   }
 
-  Path chatroomPath(ChatRoomTo chatroomTo)
+  Path chatroomPath(UUID id)
   {
-    return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
+    return storagePath.resolve(Path.of(id.toString() + ".json"));
   }
 }