TMP:test -- FIX: `ChatRoomData` active/inactive
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / SimpleChatHomeService.java
index 371d4a8..8e3cc43 100644 (file)
@@ -18,24 +18,24 @@ public class SimpleChatHomeService implements ChatHomeService
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
   private final Map<UUID, ChatRoomData> chatRoomData;
   private final Clock clock;
-  private final int bufferSize;
+  private final int historyLimit;
 
 
 
   public SimpleChatHomeService(
       Clock clock,
-      int bufferSize)
+      int historyLimit)
   {
     this(
         null,
         clock,
-        bufferSize);
+        historyLimit);
   }
 
   public SimpleChatHomeService(
       Integer shard,
       Clock clock,
-      int bufferSize)
+      int historyLimit)
   {
     log.debug("Creating SimpleChatHomeService");
 
@@ -43,7 +43,7 @@ public class SimpleChatHomeService implements ChatHomeService
     this.chatRoomInfo = new HashMap<>();
     this.chatRoomData = new HashMap<>();
     this.clock = clock;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
   }
 
 
@@ -76,29 +76,32 @@ public class SimpleChatHomeService implements ChatHomeService
               new InMemoryChatMessageService(chatRoomId);
 
           chatRoomInfo.put(chatRoomId, info);
-          chatRoomData.put(
-              info.getId(),
+          ChatRoomData chatRoomData =
               new ChatRoomData(
                   clock,
                   chatMessageService,
-                  bufferSize));
+                  historyLimit);
+          chatRoomData.activate();
+          this.chatRoomData.put(info.getId(), chatRoomData);
 
           return chatMessageService.restore(storageStrategy);
         })
-        .then()
-        .doOnSuccess(empty -> log.info("Restored {}", this))
-        .doOnError(throwable -> log.error("Could not restore {}", this));
+        .count()
+        .doOnSuccess(count -> log.info("Restored {} with {} chat-rooms", this, count))
+        .doOnError(throwable -> log.error("Could not restore {}", this))
+        .then();
   }
 
 
   @Override
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
-    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+    log.info("Creating ChatRoom with history-limit {}", historyLimit);
     ChatMessageService service = new InMemoryChatMessageService(id);
     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
     this.chatRoomInfo.put(id, chatRoomInfo);
-    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+    ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
+    chatRoomData.activate();
     this.chatRoomData.put(id, chatRoomData);
     return Mono.just(chatRoomInfo);
   }