feat: The size buffer for listeners to a chatroom is configurable
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / LocalJsonFilesStorageStrategy.java
index 40fa2bc..a90e2eb 100644 (file)
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.api.ChatroomTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
 import de.juplo.kafka.chat.backend.domain.Chatroom;
 import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
@@ -20,7 +21,6 @@ import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.util.LinkedHashMap;
 import java.util.function.Function;
-import java.util.stream.Collector;
 import java.util.stream.Collectors;
 
 import static java.nio.file.StandardOpenOption.CREATE;
@@ -83,9 +83,9 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
           {
             try
             {
-              ChatroomInfo chatroomInfo = ChatroomInfo.from(chatroom);
-              generator.writeObject(chatroomInfo);
-              writeMessages(chatroomInfo, chatroom.getMessages());
+              ChatroomTo chatroomTo = ChatroomTo.from(chatroom);
+              generator.writeObject(chatroomTo);
+              writeMessages(chatroomTo, chatroom.getMessages());
             }
             catch (IOException e)
             {
@@ -114,12 +114,12 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
       if (parser.nextToken() != JsonToken.START_ARRAY)
         throw new IllegalStateException("Expected content to be an array");
 
-      Sinks.Many<ChatroomInfo> many = Sinks.many().unicast().onBackpressureBuffer();
+      Sinks.Many<ChatroomTo> many = Sinks.many().unicast().onBackpressureBuffer();
 
       while (parser.nextToken() != JsonToken.END_ARRAY)
       {
         many
-            .tryEmitNext(mapper.readValue(parser, ChatroomInfo.class))
+            .tryEmitNext(mapper.readValue(parser, ChatroomTo.class))
             .orThrow();
       }
 
@@ -127,10 +127,10 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
 
       return many
           .asFlux()
-          .map(chatroomInfo ->
+          .map(chatroomTo ->
           {
             LinkedHashMap<Message.MessageKey, Message> messages =
-                readMessages(chatroomInfo)
+                readMessages(chatroomTo)
                     .collect(Collectors.toMap(
                         Message::getKey,
                         Function.identity(),
@@ -143,7 +143,7 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
                         LinkedHashMap::new))
                     .block();
             InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
-            return chatroomFactory.restoreChatroom(chatroomInfo.getId(), chatroomInfo.getName(), strategy);
+            return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
           });
     }
     catch (NoSuchFileException e)
@@ -158,10 +158,10 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeMessages(ChatroomInfo chatroomInfo, Flux<Message> messageFlux)
+  public void writeMessages(ChatroomTo chatroomTo, Flux<Message> messageFlux)
   {
-    Path path = chatroomPath(chatroomInfo);
-    log.info("Writing messages for {} to {}", chatroomInfo, path);
+    Path path = chatroomPath(chatroomTo);
+    log.info("Writing messages for {} to {}", chatroomTo, path);
     try
     {
       Files.createDirectories(storagePath);
@@ -217,10 +217,10 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public Flux<Message> readMessages(ChatroomInfo chatroomInfo)
+  public Flux<Message> readMessages(ChatroomTo chatroomTo)
   {
-    Path path = chatroomPath(chatroomInfo);
-    log.info("Reading messages for {} from {}", chatroomInfo, path);
+    Path path = chatroomPath(chatroomTo);
+    log.info("Reading messages for {} from {}", chatroomTo, path);
     try
     {
       JsonParser parser =
@@ -249,7 +249,7 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
       log.info(
           "{} does not exist - starting with empty chat for {}",
           path,
-          chatroomInfo);
+          chatroomTo);
       return Flux.empty();
     }
     catch (IOException e)
@@ -263,8 +263,8 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
   }
 
-  Path chatroomPath(ChatroomInfo chatroomInfo)
+  Path chatroomPath(ChatroomTo chatroomTo)
   {
-    return storagePath.resolve(Path.of(chatroomInfo.getId().toString() + ".json"));
+    return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
   }
 }