refactore: Renamed `PersistenceStrategy` to `ChatroomService` -- Rename
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / LocalJsonFilesStorageStrategy.java
index a90e2eb..6c61908 100644 (file)
@@ -1,27 +1,19 @@
 package de.juplo.kafka.chat.backend.persistence;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.api.ChatroomTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
 import de.juplo.kafka.chat.backend.domain.Chatroom;
-import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
 import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Sinks;
 
 import java.io.IOException;
 import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
-import java.util.LinkedHashMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
@@ -36,7 +28,7 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
 
   private final Path storagePath;
   private final ObjectMapper mapper;
-  private final ChatroomFactory chatroomFactory;
+  private final InMemoryChatroomFactory chatroomFactory;
 
 
   @Override
@@ -102,59 +94,19 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
   @Override
   public Flux<Chatroom> readChatrooms()
   {
-    Path path = chatroomsPath();
-    log.info("Reading chatrooms from {}", path);
-    try
-    {
-      JsonParser parser =
-          mapper
-              .getFactory()
-              .createParser(Files.newBufferedReader(path));
-
-      if (parser.nextToken() != JsonToken.START_ARRAY)
-        throw new IllegalStateException("Expected content to be an array");
-
-      Sinks.Many<ChatroomTo> many = Sinks.many().unicast().onBackpressureBuffer();
-
-      while (parser.nextToken() != JsonToken.END_ARRAY)
-      {
-        many
-            .tryEmitNext(mapper.readValue(parser, ChatroomTo.class))
-            .orThrow();
-      }
-
-      many.tryEmitComplete().orThrow();
-
-      return many
-          .asFlux()
-          .map(chatroomTo ->
-          {
-            LinkedHashMap<Message.MessageKey, Message> messages =
-                readMessages(chatroomTo)
-                    .collect(Collectors.toMap(
-                        Message::getKey,
-                        Function.identity(),
-                        (existing, message) ->
-                        {
-                          if (!message.equals(existing))
-                            throw new MessageMutationException(message, existing);
-                          return existing;
-                        },
-                        LinkedHashMap::new))
-                    .block();
-            InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
-            return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
-          });
-    }
-    catch (NoSuchFileException e)
-    {
-      log.info("{} does not exist - starting with empty ChatHome", path);
-      return Flux.empty();
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
+    JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class);
+    return Flux
+        .from(new JsonFilePublisher<ChatroomTo>(chatroomsPath(), mapper, type))
+        .log()
+        .map(chatroomTo ->
+        {
+          InMemoryChatroomService chatroomService =
+              new InMemoryChatroomService(readMessages(chatroomTo));
+          return chatroomFactory.restoreChatroom(
+              chatroomTo.getId(),
+              chatroomTo.getName(),
+              chatroomService);
+        });
   }
 
   @Override
@@ -219,43 +171,11 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
   @Override
   public Flux<Message> readMessages(ChatroomTo chatroomTo)
   {
-    Path path = chatroomPath(chatroomTo);
-    log.info("Reading messages for {} from {}", chatroomTo, path);
-    try
-    {
-      JsonParser parser =
-          mapper
-              .getFactory()
-              .createParser(Files.newBufferedReader(path));
-
-      if (parser.nextToken() != JsonToken.START_ARRAY)
-        throw new IllegalStateException("Expected content to be an array");
-
-      Sinks.Many<Message> many = Sinks.many().unicast().onBackpressureBuffer();
-
-      while (parser.nextToken() != JsonToken.END_ARRAY)
-      {
-        many
-            .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage())
-            .orThrow();
-      }
-
-      many.tryEmitComplete().orThrow();
-
-      return many.asFlux();
-    }
-    catch (NoSuchFileException e)
-    {
-      log.info(
-          "{} does not exist - starting with empty chat for {}",
-          path,
-          chatroomTo);
-      return Flux.empty();
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
+    JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
+    return Flux
+        .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+        .log()
+        .map(MessageTo::toMessage);
   }
 
   Path chatroomsPath()