TMP:test -- FIX: `ChatRoomData` active/inactive
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / SimpleChatHomeService.java
index 93593d8..8e3cc43 100644 (file)
@@ -18,34 +18,41 @@ public class SimpleChatHomeService implements ChatHomeService
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
   private final Map<UUID, ChatRoomData> chatRoomData;
   private final Clock clock;
-  private final int bufferSize;
+  private final int historyLimit;
 
 
 
   public SimpleChatHomeService(
-      StorageStrategy storageStrategy,
       Clock clock,
-      int bufferSize)
+      int historyLimit)
   {
     this(
         null,
-        storageStrategy,
         clock,
-        bufferSize);
+        historyLimit);
   }
 
   public SimpleChatHomeService(
       Integer shard,
-      StorageStrategy storageStrategy,
       Clock clock,
-      int bufferSize)
+      int historyLimit)
   {
-    this.shard = shard;
-    log.info("Created {}", this);
+    log.debug("Creating SimpleChatHomeService");
 
+    this.shard = shard;
     this.chatRoomInfo = new HashMap<>();
     this.chatRoomData = new HashMap<>();
-    storageStrategy
+    this.clock = clock;
+    this.historyLimit = historyLimit;
+  }
+
+
+  Mono<Void> restore(StorageStrategy storageStrategy)
+  {
+    chatRoomInfo.clear();
+    chatRoomData.clear();
+
+    return storageStrategy
         .readChatRoomInfo()
         .filter(info ->
         {
@@ -62,33 +69,39 @@ public class SimpleChatHomeService implements ChatHomeService
             return false;
           }
         })
-        .toStream()
-        .forEach(info ->
+        .flatMap(info ->
         {
           UUID chatRoomId = info.getId();
+          InMemoryChatMessageService chatMessageService =
+              new InMemoryChatMessageService(chatRoomId);
+
           chatRoomInfo.put(chatRoomId, info);
-          Flux<Message> messageFlux =
-              storageStrategy.readChatRoomData(chatRoomId);
-          chatRoomData.put(
-              info.getId(),
+          ChatRoomData chatRoomData =
               new ChatRoomData(
                   clock,
-                  new InMemoryChatMessageService(messageFlux),
-                  bufferSize));
-        });
-    this.clock = clock;
-    this.bufferSize = bufferSize;
+                  chatMessageService,
+                  historyLimit);
+          chatRoomData.activate();
+          this.chatRoomData.put(info.getId(), chatRoomData);
+
+          return chatMessageService.restore(storageStrategy);
+        })
+        .count()
+        .doOnSuccess(count -> log.info("Restored {} with {} chat-rooms", this, count))
+        .doOnError(throwable -> log.error("Could not restore {}", this))
+        .then();
   }
 
 
   @Override
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
-    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
-    ChatMessageService service = new InMemoryChatMessageService(Flux.empty());
+    log.info("Creating ChatRoom with history-limit {}", historyLimit);
+    ChatMessageService service = new InMemoryChatMessageService(id);
     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
     this.chatRoomInfo.put(id, chatRoomInfo);
-    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+    ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
+    chatRoomData.activate();
     this.chatRoomData.put(id, chatRoomData);
     return Mono.just(chatRoomInfo);
   }