fix: GREEN - Fixed the restore-mechanism
authorKai Moritz <kai@juplo.de>
Thu, 22 Feb 2024 15:03:08 +0000 (16:03 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 22 Feb 2024 15:52:24 +0000 (16:52 +0100)
* The code of a reactive flow _must not_ call blocking functions.
* In order to solve this, the restore-process is triggered explicitly
  after the creation of the classes.

src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java

index 0345c00..7d4b9b6 100644 (file)
@@ -2,30 +2,40 @@ package de.juplo.kafka.chat.backend.implementation.inmemory;
 
 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
 import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
 import java.util.LinkedHashMap;
+import java.util.UUID;
 
 
 @Slf4j
 public class InMemoryChatMessageService implements ChatMessageService
 {
+  private final UUID chatRoomId;
   private final LinkedHashMap<Message.MessageKey, Message> messages;
 
 
-  public InMemoryChatMessageService(Flux<Message> messageFlux)
+  public InMemoryChatMessageService(UUID chatRoomId)
   {
     log.debug("Creating InMemoryChatMessageService");
+    this.chatRoomId = chatRoomId;
     messages = new LinkedHashMap<>();
-    messageFlux
+  }
+
+
+  Mono<Void> restore(StorageStrategy storageStrategy)
+  {
+    Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomId);
+
+    return messageFlux
         .doOnNext(message -> messages.put(message.getKey(), message))
         .then()
         .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService"))
-        .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"))
-        .block();
+        .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"));
   }
 
   @Override
index 5b5785e..518cf41 100644 (file)
@@ -31,10 +31,11 @@ public class InMemoryServicesConfiguration
       StorageStrategy storageStrategy,
       Clock clock)
   {
-    return new SimpleChatHomeService(
-        storageStrategy,
+    SimpleChatHomeService chatHomeService = new SimpleChatHomeService(
         clock,
         properties.getChatroomBufferSize());
+    chatHomeService.restore(storageStrategy).block();
+    return chatHomeService;
   }
 
   @Bean
@@ -51,11 +52,14 @@ public class InMemoryServicesConfiguration
     SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
     IntStream
         .of(properties.getInmemory().getOwnedShards())
-        .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
-            shard,
-            storageStrategy,
-            clock,
-            properties.getChatroomBufferSize()));
+        .forEach(shard ->
+        {
+          SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService(
+              shard,
+              clock,
+              properties.getChatroomBufferSize());
+          service.restore(storageStrategy).block();
+        });
     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
     return new ShardedChatHomeService(
         properties.getInstanceId(),
index dfe8567..5c3fe2e 100644 (file)
@@ -23,20 +23,17 @@ public class SimpleChatHomeService implements ChatHomeService
 
 
   public SimpleChatHomeService(
-      StorageStrategy storageStrategy,
       Clock clock,
       int bufferSize)
   {
     this(
         null,
-        storageStrategy,
         clock,
         bufferSize);
   }
 
   public SimpleChatHomeService(
       Integer shard,
-      StorageStrategy storageStrategy,
       Clock clock,
       int bufferSize)
   {
@@ -45,8 +42,14 @@ public class SimpleChatHomeService implements ChatHomeService
     this.shard = shard;
     this.chatRoomInfo = new HashMap<>();
     this.chatRoomData = new HashMap<>();
+    this.clock = clock;
+    this.bufferSize = bufferSize;
+  }
 
-    storageStrategy
+
+  Mono<Void> restore(StorageStrategy storageStrategy)
+  {
+    return storageStrategy
         .readChatRoomInfo()
         .filter(info ->
         {
@@ -63,26 +66,25 @@ public class SimpleChatHomeService implements ChatHomeService
             return false;
           }
         })
-        .doOnNext(info ->
+        .flatMap(info ->
         {
           UUID chatRoomId = info.getId();
+          InMemoryChatMessageService chatMessageService =
+              new InMemoryChatMessageService(chatRoomId);
+
           chatRoomInfo.put(chatRoomId, info);
-          Flux<Message> messageFlux =
-              storageStrategy.readChatRoomData(chatRoomId);
           chatRoomData.put(
               info.getId(),
               new ChatRoomData(
                   clock,
-                  new InMemoryChatMessageService(messageFlux),
+                  chatMessageService,
                   bufferSize));
+
+          return chatMessageService.restore(storageStrategy);
         })
         .then()
         .doOnSuccess(empty -> log.info("Restored {}", this))
-        .doOnError(throwable -> log.error("Could not restore {}", this))
-        .block();
-
-    this.clock = clock;
-    this.bufferSize = bufferSize;
+        .doOnError(throwable -> log.error("Could not restore {}", this));
   }
 
 
@@ -90,7 +92,7 @@ public class SimpleChatHomeService implements ChatHomeService
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
-    ChatMessageService service = new InMemoryChatMessageService(Flux.empty());
+    ChatMessageService service = new InMemoryChatMessageService(id);
     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
     this.chatRoomInfo.put(id, chatRoomInfo);
     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);