refactor: `LocalJsonFilesStorageStrategy` is now realy reactive
authorKai Moritz <kai@juplo.de>
Sat, 7 Jan 2023 20:58:27 +0000 (21:58 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Jan 2023 18:35:59 +0000 (19:35 +0100)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/JsonFilePublisher.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java

index adabb92..b590854 100644 (file)
@@ -23,9 +23,7 @@ public class ChatBackendConfiguration
       ChatroomFactory chatroomFactory,
       StorageStrategy storageStrategy)
   {
-    return new ChatHome(
-        storageStrategy.readChatrooms().collectMap(chatroom -> chatroom.getId()).block(),
-        chatroomFactory);
+    return new ChatHome(chatroomFactory, storageStrategy.readChatrooms());
   }
 
   @Bean
index d04bd73..ed4d8c7 100644 (file)
@@ -1,17 +1,25 @@
 package de.juplo.kafka.chat.backend.domain;
 
-import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
 
 import java.util.*;
 import java.util.stream.Stream;
 
 
-@RequiredArgsConstructor
+@Slf4j
 public class ChatHome
 {
   private final Map<UUID, Chatroom> chatrooms;
   private final ChatroomFactory factory;
 
+  public ChatHome(ChatroomFactory factory, Flux<Chatroom> chatroomFlux)
+  {
+    log.debug("Creating ChatHome with factory: {}", factory);
+    this.factory = factory;
+    this.chatrooms = new HashMap<>();
+    chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+  }
 
   public Chatroom createChatroom(String name)
   {
index 19ef343..4b522a8 100644 (file)
@@ -22,6 +22,12 @@ public class InMemoryPersistenceStrategy implements PersistenceStrategy
     this.messages = messages;
   }
 
+  public InMemoryPersistenceStrategy(Flux<Message> messageFlux)
+  {
+    log.debug("Creating InMemoryPersistenceStrategy");
+    messages = new LinkedHashMap<>();
+    messageFlux.subscribe(message -> persistMessage(message));
+  }
 
   @Override
   public Mono<Message> persistMessage(
@@ -30,20 +36,23 @@ public class InMemoryPersistenceStrategy implements PersistenceStrategy
       String text)
   {
     Message message = new Message(key, (long)messages.size(), timestamp, text);
+    return Mono.justOrEmpty(persistMessage(message));
+  }
 
+  private Message persistMessage(Message message)
+  {
+    Message.MessageKey key = message.getKey();
     Message existing = messages.get(key);
     if (existing != null)
     {
       log.info("Message with key {} already exists; {}", key, existing);
       if (!message.equals(existing))
         throw new MessageMutationException(message, existing);
-      return Mono.empty();
+      return null;
     }
 
     messages.put(key, message);
-    return Mono
-        .fromSupplier(() -> message)
-        .log();
+    return message;
   }
 
   @Override
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/JsonFilePublisher.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/JsonFilePublisher.java
new file mode 100644 (file)
index 0000000..e502539
--- /dev/null
@@ -0,0 +1,118 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Consumer;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class JsonFilePublisher<T> implements Publisher<T>
+{
+  private final Path path;
+  private final ObjectMapper mapper;
+  private final JavaType type;
+
+
+  @Override
+  public void subscribe(Subscriber<? super T> subscriber)
+  {
+    log.info("Reading chatrooms from {}", path);
+
+    try
+    {
+      JsonParser parser =
+          mapper.getFactory().createParser(Files.newBufferedReader(path));
+
+      if (parser.nextToken() != JsonToken.START_ARRAY)
+      {
+        throw new IllegalStateException("Expected content to be an array");
+      }
+
+      subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
+    }
+    catch (NoSuchFileException e)
+    {
+      log.info("{} does not exist - starting with empty ChatHome", path);
+      subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
+    }
+    catch (IOException | IllegalStateException e)
+    {
+      subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
+    }
+  }
+
+  @RequiredArgsConstructor
+  private class JsonFileSubscription implements Subscription
+  {
+    private final Subscriber<? super T> subscriber;
+    private final JsonParser parser;
+
+    @Override
+    public void request(long requested)
+    {
+      try
+      {
+        while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
+        {
+          subscriber.onNext(mapper.readValue(parser, type));
+          requested--;
+        }
+
+        if (requested > 0)
+          subscriber.onComplete();
+      }
+      catch (IOException e)
+      {
+        subscriber.onError(e);
+      }
+    }
+
+    @Override
+    public void cancel() {}
+  }
+
+  private class ReplaySubscription implements Subscription
+  {
+    private final Subscriber<? super T> subscriber;
+    private final Iterator<Consumer<Subscriber<? super T>>> iterator;
+
+    ReplaySubscription(
+        Subscriber<? super T> subscriber,
+        Iterable<Consumer<Subscriber<? super T>>> actions)
+    {
+      this.subscriber = subscriber;
+      this.iterator = actions.iterator();
+    }
+
+    @Override
+    public void request(long requested)
+    {
+      while (requested > 0 && iterator.hasNext())
+      {
+        iterator.next().accept(subscriber);
+        requested--;
+      }
+
+      if (requested > 0)
+        subscriber.onComplete();
+    }
+
+    @Override
+    public void cancel() {}
+  }
+}
index a90e2eb..c195893 100644 (file)
@@ -1,27 +1,20 @@
 package de.juplo.kafka.chat.backend.persistence;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.api.ChatroomTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
 import de.juplo.kafka.chat.backend.domain.Chatroom;
 import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
 import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Sinks;
 
 import java.io.IOException;
 import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
-import java.util.LinkedHashMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
@@ -102,59 +95,16 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
   @Override
   public Flux<Chatroom> readChatrooms()
   {
-    Path path = chatroomsPath();
-    log.info("Reading chatrooms from {}", path);
-    try
-    {
-      JsonParser parser =
-          mapper
-              .getFactory()
-              .createParser(Files.newBufferedReader(path));
-
-      if (parser.nextToken() != JsonToken.START_ARRAY)
-        throw new IllegalStateException("Expected content to be an array");
-
-      Sinks.Many<ChatroomTo> many = Sinks.many().unicast().onBackpressureBuffer();
-
-      while (parser.nextToken() != JsonToken.END_ARRAY)
-      {
-        many
-            .tryEmitNext(mapper.readValue(parser, ChatroomTo.class))
-            .orThrow();
-      }
-
-      many.tryEmitComplete().orThrow();
-
-      return many
-          .asFlux()
-          .map(chatroomTo ->
-          {
-            LinkedHashMap<Message.MessageKey, Message> messages =
-                readMessages(chatroomTo)
-                    .collect(Collectors.toMap(
-                        Message::getKey,
-                        Function.identity(),
-                        (existing, message) ->
-                        {
-                          if (!message.equals(existing))
-                            throw new MessageMutationException(message, existing);
-                          return existing;
-                        },
-                        LinkedHashMap::new))
-                    .block();
-            InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
-            return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
-          });
-    }
-    catch (NoSuchFileException e)
-    {
-      log.info("{} does not exist - starting with empty ChatHome", path);
-      return Flux.empty();
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
+    JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class);
+    return Flux
+        .from(new JsonFilePublisher<ChatroomTo>(chatroomsPath(), mapper, type))
+        .log()
+        .map(chatroomTo ->
+        {
+          InMemoryPersistenceStrategy strategy =
+              new InMemoryPersistenceStrategy(readMessages(chatroomTo));
+          return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
+        });
   }
 
   @Override
@@ -219,43 +169,11 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy
   @Override
   public Flux<Message> readMessages(ChatroomTo chatroomTo)
   {
-    Path path = chatroomPath(chatroomTo);
-    log.info("Reading messages for {} from {}", chatroomTo, path);
-    try
-    {
-      JsonParser parser =
-          mapper
-              .getFactory()
-              .createParser(Files.newBufferedReader(path));
-
-      if (parser.nextToken() != JsonToken.START_ARRAY)
-        throw new IllegalStateException("Expected content to be an array");
-
-      Sinks.Many<Message> many = Sinks.many().unicast().onBackpressureBuffer();
-
-      while (parser.nextToken() != JsonToken.END_ARRAY)
-      {
-        many
-            .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage())
-            .orThrow();
-      }
-
-      many.tryEmitComplete().orThrow();
-
-      return many.asFlux();
-    }
-    catch (NoSuchFileException e)
-    {
-      log.info(
-          "{} does not exist - starting with empty chat for {}",
-          path,
-          chatroomTo);
-      return Flux.empty();
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
+    JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
+    return Flux
+        .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+        .log()
+        .map(MessageTo::toMessage);
   }
 
   Path chatroomsPath()