feat: The chats are stored as local json-files
authorKai Moritz <kai@juplo.de>
Thu, 5 Jan 2023 15:09:04 +0000 (16:09 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Jan 2023 18:35:59 +0000 (19:35 +0100)
13 files changed:
src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/api/MessageTo.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java
src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatroomFactory.java
src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/ChatroomInfo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatroomFactory.java
src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java [new file with mode: 0644]

index 7f6f213..37eeeee 100644 (file)
@@ -1,15 +1,29 @@
 package de.juplo.kafka.chat.backend;
 
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import jakarta.annotation.PreDestroy;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.context.annotation.Bean;
-
-import java.time.Clock;
+import reactor.core.publisher.Flux;
 
 
 @SpringBootApplication
 public class ChatBackendApplication
 {
+       @Autowired
+       ChatHome chatHome;
+       @Autowired
+       StorageStrategy storageStrategy;
+
+       @PreDestroy
+       public void onExit()
+       {
+               storageStrategy.writeChatrooms(Flux.fromIterable(chatHome.list()));
+       }
+
        public static void main(String[] args)
        {
                SpringApplication.run(ChatBackendApplication.class, args);
index 835627a..6387c6e 100644 (file)
@@ -1,22 +1,46 @@
 package de.juplo.kafka.chat.backend;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
+import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.InMemoryChatroomFactory;
 import de.juplo.kafka.chat.backend.persistence.InMemoryPersistenceStrategy;
+import de.juplo.kafka.chat.backend.persistence.LocalJsonFilesStorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.nio.file.Paths;
 import java.time.Clock;
+import java.util.LinkedHashMap;
 
 
 @Configuration
+@EnableConfigurationProperties(ChatBackendProperties.class)
 public class ChatBackendConfiguration
 {
   @Bean
-  public ChatHome chatHome(ChatroomFactory chatroomFactory)
+  public ChatHome chatHome(
+      ChatroomFactory chatroomFactory,
+      StorageStrategy storageStrategy)
   {
-    return new ChatHome(chatroomFactory);
+    return new ChatHome(
+        storageStrategy.readChatrooms().collectMap(chatroom -> chatroom.getId()).block(),
+        chatroomFactory);
+  }
+
+  @Bean
+  public StorageStrategy storageStrategy(
+      ChatBackendProperties properties,
+      ObjectMapper mapper,
+      ChatroomFactory chatroomFactory)
+  {
+    return new LocalJsonFilesStorageStrategy(
+        Paths.get(properties.getDatadir()),
+        mapper,
+        chatroomFactory);
   }
 
   @Bean
@@ -28,7 +52,7 @@ public class ChatBackendConfiguration
   @Bean
   InMemoryPersistenceStrategy persistenceStrategy()
   {
-    return new InMemoryPersistenceStrategy();
+    return new InMemoryPersistenceStrategy(new LinkedHashMap<>());
   }
 
   @Bean
diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
new file mode 100644 (file)
index 0000000..598e8ce
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.chat.backend;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+import java.nio.file.Paths;
+
+
+@ConfigurationProperties("chat.backend")
+@Getter
+@Setter
+public class ChatBackendProperties
+{
+  private String datadir = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString();
+}
index 53d2319..3525d56 100644 (file)
@@ -17,6 +17,11 @@ public class MessageTo
   private String user;
   private String text;
 
+  public Message toMessage()
+  {
+    return new Message(Message.MessageKey.of(user, id), serial, time, text);
+  }
+
   public static MessageTo from(Message message)
   {
     return
index 84b9e94..2b5152c 100644 (file)
@@ -8,7 +8,7 @@ import java.util.*;
 @RequiredArgsConstructor
 public class ChatHome
 {
-  private final Map<UUID, Chatroom> chatrooms = new HashMap<>();
+  private final Map<UUID, Chatroom> chatrooms;
   private final ChatroomFactory factory;
 
 
index c05fda0..8f4a797 100644 (file)
@@ -44,6 +44,11 @@ public class Chatroom
     return sink.asFlux();
   }
 
+  public Flux<Message> getMessages()
+  {
+    return getMessages(0, Long.MAX_VALUE);
+  }
+
   public Flux<Message> getMessages(long first, long last)
   {
     return persistence.getMessages(first, last);
index 02ce3d2..f5e8c19 100644 (file)
@@ -3,7 +3,8 @@ package de.juplo.kafka.chat.backend.domain;
 import java.util.UUID;
 
 
-public interface ChatroomFactory
+public interface ChatroomFactory<Strategy extends PersistenceStrategy>
 {
   Chatroom createChatroom(UUID id, String name);
+  Chatroom restoreChatroom(UUID id, String name, Strategy strategy);
 }
index 452a62d..d3a8364 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.chat.backend.domain;
 
-import lombok.Value;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/ChatroomInfo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/ChatroomInfo.java
new file mode 100644 (file)
index 0000000..6536175
--- /dev/null
@@ -0,0 +1,22 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.Chatroom;
+import lombok.Data;
+
+import java.util.UUID;
+
+@Data
+public class ChatroomInfo
+{
+  private UUID id;
+  private String name;
+
+
+  public static ChatroomInfo from(Chatroom chatroom)
+  {
+    ChatroomInfo info = new ChatroomInfo();
+    info.id = chatroom.getId();
+    info.name = chatroom.getName();
+    return info;
+  }
+}
index b2b7f1d..231eb37 100644 (file)
@@ -7,8 +7,9 @@ import lombok.RequiredArgsConstructor;
 
 import java.util.UUID;
 
+
 @RequiredArgsConstructor
-public class InMemoryChatroomFactory implements ChatroomFactory
+public class InMemoryChatroomFactory implements ChatroomFactory<InMemoryPersistenceStrategy>
 {
   private final PersistenceStrategy persistenceStrategy;
 
@@ -18,4 +19,13 @@ public class InMemoryChatroomFactory implements ChatroomFactory
   {
     return new Chatroom(id, name, persistenceStrategy);
   }
+
+  @Override
+  public Chatroom restoreChatroom(
+      UUID id,
+      String name,
+      InMemoryPersistenceStrategy persistenceStrategy)
+  {
+    return new Chatroom(id, name, persistenceStrategy);
+  }
 }
index 454419e..19ef343 100644 (file)
@@ -3,7 +3,6 @@ package de.juplo.kafka.chat.backend.persistence;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -12,11 +11,17 @@ import java.time.LocalDateTime;
 import java.util.LinkedHashMap;
 
 
-@RequiredArgsConstructor
 @Slf4j
 public class InMemoryPersistenceStrategy implements PersistenceStrategy
 {
-  private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+  private final LinkedHashMap<Message.MessageKey, Message> messages;
+
+
+  public InMemoryPersistenceStrategy(LinkedHashMap<Message.MessageKey, Message> messages)
+  {
+    this.messages = messages;
+  }
+
 
   @Override
   public Mono<Message> persistMessage(
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java
new file mode 100644 (file)
index 0000000..40fa2bc
--- /dev/null
@@ -0,0 +1,270 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import de.juplo.kafka.chat.backend.domain.Chatroom;
+import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Sinks;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.LinkedHashMap;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class LocalJsonFilesStorageStrategy implements StorageStrategy
+{
+  public static final String CHATROOMS_FILENAME = "chatrooms.json";
+
+
+  private final Path storagePath;
+  private final ObjectMapper mapper;
+  private final ChatroomFactory chatroomFactory;
+
+
+  @Override
+  public void writeChatrooms(Flux<Chatroom> chatroomFlux)
+  {
+    Path path = chatroomsPath();
+    log.info("Writing chatrooms to {}", path);
+    try
+    {
+      Files.createDirectories(storagePath);
+
+      JsonGenerator generator =
+          mapper
+              .getFactory()
+              .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+      chatroomFlux
+          .log()
+          .doFirst(() ->
+          {
+            try
+            {
+              generator.useDefaultPrettyPrinter();
+              generator.writeStartArray();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .doOnTerminate(() ->
+          {
+            try
+            {
+              generator.writeEndArray();
+              generator.close();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .subscribe(chatroom ->
+          {
+            try
+            {
+              ChatroomInfo chatroomInfo = ChatroomInfo.from(chatroom);
+              generator.writeObject(chatroomInfo);
+              writeMessages(chatroomInfo, chatroom.getMessages());
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+    catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Flux<Chatroom> readChatrooms()
+  {
+    Path path = chatroomsPath();
+    log.info("Reading chatrooms from {}", path);
+    try
+    {
+      JsonParser parser =
+          mapper
+              .getFactory()
+              .createParser(Files.newBufferedReader(path));
+
+      if (parser.nextToken() != JsonToken.START_ARRAY)
+        throw new IllegalStateException("Expected content to be an array");
+
+      Sinks.Many<ChatroomInfo> many = Sinks.many().unicast().onBackpressureBuffer();
+
+      while (parser.nextToken() != JsonToken.END_ARRAY)
+      {
+        many
+            .tryEmitNext(mapper.readValue(parser, ChatroomInfo.class))
+            .orThrow();
+      }
+
+      many.tryEmitComplete().orThrow();
+
+      return many
+          .asFlux()
+          .map(chatroomInfo ->
+          {
+            LinkedHashMap<Message.MessageKey, Message> messages =
+                readMessages(chatroomInfo)
+                    .collect(Collectors.toMap(
+                        Message::getKey,
+                        Function.identity(),
+                        (existing, message) ->
+                        {
+                          if (!message.equals(existing))
+                            throw new MessageMutationException(message, existing);
+                          return existing;
+                        },
+                        LinkedHashMap::new))
+                    .block();
+            InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
+            return chatroomFactory.restoreChatroom(chatroomInfo.getId(), chatroomInfo.getName(), strategy);
+          });
+    }
+    catch (NoSuchFileException e)
+    {
+      log.info("{} does not exist - starting with empty ChatHome", path);
+      return Flux.empty();
+    }
+    catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void writeMessages(ChatroomInfo chatroomInfo, Flux<Message> messageFlux)
+  {
+    Path path = chatroomPath(chatroomInfo);
+    log.info("Writing messages for {} to {}", chatroomInfo, path);
+    try
+    {
+      Files.createDirectories(storagePath);
+
+      JsonGenerator generator =
+          mapper
+              .getFactory()
+              .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+      messageFlux
+          .log()
+          .doFirst(() ->
+          {
+            try
+            {
+              generator.useDefaultPrettyPrinter();
+              generator.writeStartArray();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .doOnTerminate(() ->
+          {
+            try
+            {
+              generator.writeEndArray();
+              generator.close();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .subscribe(message ->
+          {
+            try
+            {
+              MessageTo messageTo = MessageTo.from(message);
+              generator.writeObject(messageTo);
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+    catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Flux<Message> readMessages(ChatroomInfo chatroomInfo)
+  {
+    Path path = chatroomPath(chatroomInfo);
+    log.info("Reading messages for {} from {}", chatroomInfo, path);
+    try
+    {
+      JsonParser parser =
+          mapper
+              .getFactory()
+              .createParser(Files.newBufferedReader(path));
+
+      if (parser.nextToken() != JsonToken.START_ARRAY)
+        throw new IllegalStateException("Expected content to be an array");
+
+      Sinks.Many<Message> many = Sinks.many().unicast().onBackpressureBuffer();
+
+      while (parser.nextToken() != JsonToken.END_ARRAY)
+      {
+        many
+            .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage())
+            .orThrow();
+      }
+
+      many.tryEmitComplete().orThrow();
+
+      return many.asFlux();
+    }
+    catch (NoSuchFileException e)
+    {
+      log.info(
+          "{} does not exist - starting with empty chat for {}",
+          path,
+          chatroomInfo);
+      return Flux.empty();
+    }
+    catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+
+  Path chatroomsPath()
+  {
+    return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
+  }
+
+  Path chatroomPath(ChatroomInfo chatroomInfo)
+  {
+    return storagePath.resolve(Path.of(chatroomInfo.getId().toString() + ".json"));
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java
new file mode 100644 (file)
index 0000000..3c268ca
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.Chatroom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import reactor.core.publisher.Flux;
+
+
+public interface StorageStrategy
+{
+  void writeChatrooms(Flux<Chatroom> chatroomFlux);
+  Flux<Chatroom> readChatrooms();
+  void writeMessages(ChatroomInfo chatroomInfo, Flux<Message> messageFlux);
+  Flux<Message> readMessages(ChatroomInfo chatroomInfo);
+}