refactor: Pulled business-logic into class `ShardedChatHome`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / files / FilesStorageStrategy.java
index 9952117..1e3e5ee 100644 (file)
@@ -1,10 +1,11 @@
-package de.juplo.kafka.chat.backend.persistence.filestorage;
+package de.juplo.kafka.chat.backend.persistence.storage.files;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
@@ -16,6 +17,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Clock;
+import java.util.UUID;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
@@ -23,7 +25,7 @@ import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
 
 @RequiredArgsConstructor
 @Slf4j
-public class FileStorageStrategy implements StorageStrategy
+public class FilesStorageStrategy implements StorageStrategy
 {
   public static final String CHATROOMS_FILENAME = "chatrooms.json";
 
@@ -31,12 +33,13 @@ public class FileStorageStrategy implements StorageStrategy
   private final Path storagePath;
   private final Clock clock;
   private final int bufferSize;
+  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
   private final ObjectMapper mapper;
 
 
   @Override
-  public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
+  public void write(Flux<ChatRoom> chatroomFlux)
   {
     Path path = chatroomsPath();
     log.info("Writing chatrooms to {}", path);
@@ -96,21 +99,26 @@ public class FileStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public Flux<ChatRoom> readChatrooms()
+  public Flux<ChatRoom> read()
   {
     JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
     return Flux
         .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
         .log()
-        .map(chatRoomTo -> new ChatRoom(
-            chatRoomTo.getId(),
-            chatRoomTo.getName(),
-            clock,
-            factory.create(readMessages(chatRoomTo)),
-            bufferSize));
+        .map(chatRoomTo ->
+        {
+          UUID chatRoomId = chatRoomTo.getId();
+          int shard = shardingStrategy.selectShard(chatRoomId);
+          return new ChatRoom(
+              chatRoomTo.getId(),
+              chatRoomTo.getName(),
+              shard,
+              clock,
+              factory.create(readMessages(chatRoomTo)),
+              bufferSize);
+        });
   }
 
-  @Override
   public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
   {
     Path path = chatroomPath(chatroomTo);
@@ -169,7 +177,6 @@ public class FileStorageStrategy implements StorageStrategy
     }
   }
 
-  @Override
   public Flux<Message> readMessages(ChatRoomTo chatroomTo)
   {
     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);