refactore: Renamed `PersistenceStrategy` to `ChatroomService` -- Rename
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / LocalJsonFilesStorageStrategy.java
index 40fa2bc..6c61908 100644 (file)
@@ -1,27 +1,19 @@
 package de.juplo.kafka.chat.backend.persistence;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.api.ChatroomTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
 import de.juplo.kafka.chat.backend.domain.Chatroom;
-import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
 import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Sinks;
 
 import java.io.IOException;
 import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
-import java.util.LinkedHashMap;
-import java.util.function.Function;
-import java.util.stream.Collector;
-import java.util.stream.Collectors;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
@@ -36,7 +28,7 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
 
   private final Path storagePath;
   private final ObjectMapper mapper;
-  private final ChatroomFactory chatroomFactory;
+  private final InMemoryChatroomFactory chatroomFactory;
 
 
   @Override
@@ -83,9 +75,9 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
           {
             try
             {
-              ChatroomInfo chatroomInfo = ChatroomInfo.from(chatroom);
-              generator.writeObject(chatroomInfo);
-              writeMessages(chatroomInfo, chatroom.getMessages());
+              ChatroomTo chatroomTo = ChatroomTo.from(chatroom);
+              generator.writeObject(chatroomTo);
+              writeMessages(chatroomTo, chatroom.getMessages());
             }
             catch (IOException e)
             {
@@ -102,66 +94,26 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
   @Override
   public Flux<Chatroom> readChatrooms()
   {
-    Path path = chatroomsPath();
-    log.info("Reading chatrooms from {}", path);
-    try
-    {
-      JsonParser parser =
-          mapper
-              .getFactory()
-              .createParser(Files.newBufferedReader(path));
-
-      if (parser.nextToken() != JsonToken.START_ARRAY)
-        throw new IllegalStateException("Expected content to be an array");
-
-      Sinks.Many<ChatroomInfo> many = Sinks.many().unicast().onBackpressureBuffer();
-
-      while (parser.nextToken() != JsonToken.END_ARRAY)
-      {
-        many
-            .tryEmitNext(mapper.readValue(parser, ChatroomInfo.class))
-            .orThrow();
-      }
-
-      many.tryEmitComplete().orThrow();
-
-      return many
-          .asFlux()
-          .map(chatroomInfo ->
-          {
-            LinkedHashMap<Message.MessageKey, Message> messages =
-                readMessages(chatroomInfo)
-                    .collect(Collectors.toMap(
-                        Message::getKey,
-                        Function.identity(),
-                        (existing, message) ->
-                        {
-                          if (!message.equals(existing))
-                            throw new MessageMutationException(message, existing);
-                          return existing;
-                        },
-                        LinkedHashMap::new))
-                    .block();
-            InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
-            return chatroomFactory.restoreChatroom(chatroomInfo.getId(), chatroomInfo.getName(), strategy);
-          });
-    }
-    catch (NoSuchFileException e)
-    {
-      log.info("{} does not exist - starting with empty ChatHome", path);
-      return Flux.empty();
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
+    JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class);
+    return Flux
+        .from(new JsonFilePublisher<ChatroomTo>(chatroomsPath(), mapper, type))
+        .log()
+        .map(chatroomTo ->
+        {
+          InMemoryChatroomService chatroomService =
+              new InMemoryChatroomService(readMessages(chatroomTo));
+          return chatroomFactory.restoreChatroom(
+              chatroomTo.getId(),
+              chatroomTo.getName(),
+              chatroomService);
+        });
   }
 
   @Override
-  public void writeMessages(ChatroomInfo chatroomInfo, Flux<Message> messageFlux)
+  public void writeMessages(ChatroomTo chatroomTo, Flux<Message> messageFlux)
   {
-    Path path = chatroomPath(chatroomInfo);
-    log.info("Writing messages for {} to {}", chatroomInfo, path);
+    Path path = chatroomPath(chatroomTo);
+    log.info("Writing messages for {} to {}", chatroomTo, path);
     try
     {
       Files.createDirectories(storagePath);
@@ -217,45 +169,13 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public Flux<Message> readMessages(ChatroomInfo chatroomInfo)
+  public Flux<Message> readMessages(ChatroomTo chatroomTo)
   {
-    Path path = chatroomPath(chatroomInfo);
-    log.info("Reading messages for {} from {}", chatroomInfo, path);
-    try
-    {
-      JsonParser parser =
-          mapper
-              .getFactory()
-              .createParser(Files.newBufferedReader(path));
-
-      if (parser.nextToken() != JsonToken.START_ARRAY)
-        throw new IllegalStateException("Expected content to be an array");
-
-      Sinks.Many<Message> many = Sinks.many().unicast().onBackpressureBuffer();
-
-      while (parser.nextToken() != JsonToken.END_ARRAY)
-      {
-        many
-            .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage())
-            .orThrow();
-      }
-
-      many.tryEmitComplete().orThrow();
-
-      return many.asFlux();
-    }
-    catch (NoSuchFileException e)
-    {
-      log.info(
-          "{} does not exist - starting with empty chat for {}",
-          path,
-          chatroomInfo);
-      return Flux.empty();
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
+    JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
+    return Flux
+        .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+        .log()
+        .map(MessageTo::toMessage);
   }
 
   Path chatroomsPath()
@@ -263,8 +183,8 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
   }
 
-  Path chatroomPath(ChatroomInfo chatroomInfo)
+  Path chatroomPath(ChatroomTo chatroomTo)
   {
-    return storagePath.resolve(Path.of(chatroomInfo.getId().toString() + ".json"));
+    return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
   }
 }