feat: Added counting of restored instances
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / SimpleChatHomeService.java
index 30a181e..d568a9b 100644 (file)
@@ -23,29 +23,36 @@ public class SimpleChatHomeService implements ChatHomeService
 
 
   public SimpleChatHomeService(
-      StorageStrategy storageStrategy,
       Clock clock,
       int bufferSize)
   {
     this(
         null,
-        storageStrategy,
         clock,
         bufferSize);
   }
 
   public SimpleChatHomeService(
       Integer shard,
-      StorageStrategy storageStrategy,
       Clock clock,
       int bufferSize)
   {
-    log.info("Created SimpleChatHome for shard {}", shard);
-;
+    log.debug("Creating SimpleChatHomeService");
+
     this.shard = shard;
     this.chatRoomInfo = new HashMap<>();
     this.chatRoomData = new HashMap<>();
-    storageStrategy
+    this.clock = clock;
+    this.bufferSize = bufferSize;
+  }
+
+
+  Mono<Void> restore(StorageStrategy storageStrategy)
+  {
+    chatRoomInfo.clear();
+    chatRoomData.clear();
+
+    return storageStrategy
         .readChatRoomInfo()
         .filter(info ->
         {
@@ -62,22 +69,26 @@ public class SimpleChatHomeService implements ChatHomeService
             return false;
           }
         })
-        .toStream()
-        .forEach(info ->
+        .flatMap(info ->
         {
           UUID chatRoomId = info.getId();
+          InMemoryChatMessageService chatMessageService =
+              new InMemoryChatMessageService(chatRoomId);
+
           chatRoomInfo.put(chatRoomId, info);
-          Flux<Message> messageFlux =
-              storageStrategy.readChatRoomData(chatRoomId);
           chatRoomData.put(
               info.getId(),
               new ChatRoomData(
                   clock,
-                  new InMemoryChatMessageService(messageFlux),
+                  chatMessageService,
                   bufferSize));
-        });
-    this.clock = clock;
-    this.bufferSize = bufferSize;
+
+          return chatMessageService.restore(storageStrategy);
+        })
+        .count()
+        .doOnSuccess(count -> log.info("Restored {} with {} chat-rooms", this, count))
+        .doOnError(throwable -> log.error("Could not restore {}", this))
+        .then();
   }
 
 
@@ -85,7 +96,7 @@ public class SimpleChatHomeService implements ChatHomeService
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
-    ChatMessageService service = new InMemoryChatMessageService(Flux.empty());
+    ChatMessageService service = new InMemoryChatMessageService(id);
     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
     this.chatRoomInfo.put(id, chatRoomInfo);
     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
@@ -114,4 +125,16 @@ public class SimpleChatHomeService implements ChatHomeService
         .justOrEmpty(chatRoomData.get(id))
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
+
+  @Override
+  public Mono<String[]> getShardOwners()
+  {
+    return Mono.empty();
+  }
+
+  @Override
+  public String toString()
+  {
+    return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;
+  }
 }