refactor: RED - Refined success/error-handling for restore-operations
authorKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 15:12:00 +0000 (16:12 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 22 Feb 2024 15:52:24 +0000 (16:52 +0100)
* This innocent little change discloses a severe missconception in the
  implementation of the storage strategies.
* The call to `Mono.block()`, though not really changing the behaviour
  during the restore-process, triggers a sanity-check from
  io.projectractor.

src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java

index 8f3e495..0345c00 100644 (file)
@@ -20,7 +20,12 @@ public class InMemoryChatMessageService implements ChatMessageService
   {
     log.debug("Creating InMemoryChatMessageService");
     messages = new LinkedHashMap<>();
-    messageFlux.subscribe(message -> messages.put(message.getKey(), message));
+    messageFlux
+        .doOnNext(message -> messages.put(message.getKey(), message))
+        .then()
+        .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService"))
+        .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"))
+        .block();
   }
 
   @Override
index 93593d8..dfe8567 100644 (file)
@@ -40,11 +40,12 @@ public class SimpleChatHomeService implements ChatHomeService
       Clock clock,
       int bufferSize)
   {
-    this.shard = shard;
-    log.info("Created {}", this);
+    log.debug("Creating SimpleChatHomeService");
 
+    this.shard = shard;
     this.chatRoomInfo = new HashMap<>();
     this.chatRoomData = new HashMap<>();
+
     storageStrategy
         .readChatRoomInfo()
         .filter(info ->
@@ -62,8 +63,7 @@ public class SimpleChatHomeService implements ChatHomeService
             return false;
           }
         })
-        .toStream()
-        .forEach(info ->
+        .doOnNext(info ->
         {
           UUID chatRoomId = info.getId();
           chatRoomInfo.put(chatRoomId, info);
@@ -75,7 +75,12 @@ public class SimpleChatHomeService implements ChatHomeService
                   clock,
                   new InMemoryChatMessageService(messageFlux),
                   bufferSize));
-        });
+        })
+        .then()
+        .doOnSuccess(empty -> log.info("Restored {}", this))
+        .doOnError(throwable -> log.error("Could not restore {}", this))
+        .block();
+
     this.clock = clock;
     this.bufferSize = bufferSize;
   }