From: Kai Moritz Date: Tue, 20 Feb 2024 15:12:00 +0000 (+0100) Subject: refactor: RED - Refined success/error-handling for restore-operations X-Git-Tag: rebase--2024-02-23--16-28~21 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=803a9193c8b1ad1ec68f1818fc091291a60c8f4c;p=demos%2Fkafka%2Fchat refactor: RED - Refined success/error-handling for restore-operations * This innocent little change discloses a severe missconception in the implementation of the storage strategies. * The call to `Mono.block()`, though not really changing the behaviour during the restore-process, triggers a sanity-check from io.projectractor. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java index 8f3e4956..0345c008 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java @@ -20,7 +20,12 @@ public class InMemoryChatMessageService implements ChatMessageService { log.debug("Creating InMemoryChatMessageService"); messages = new LinkedHashMap<>(); - messageFlux.subscribe(message -> messages.put(message.getKey(), message)); + messageFlux + .doOnNext(message -> messages.put(message.getKey(), message)) + .then() + .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService")) + .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService")) + .block(); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index 93593d86..dfe8567a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -40,11 +40,12 @@ public class SimpleChatHomeService implements ChatHomeService Clock clock, int bufferSize) { - this.shard = shard; - log.info("Created {}", this); + log.debug("Creating SimpleChatHomeService"); + this.shard = shard; this.chatRoomInfo = new HashMap<>(); this.chatRoomData = new HashMap<>(); + storageStrategy .readChatRoomInfo() .filter(info -> @@ -62,8 +63,7 @@ public class SimpleChatHomeService implements ChatHomeService return false; } }) - .toStream() - .forEach(info -> + .doOnNext(info -> { UUID chatRoomId = info.getId(); chatRoomInfo.put(chatRoomId, info); @@ -75,7 +75,12 @@ public class SimpleChatHomeService implements ChatHomeService clock, new InMemoryChatMessageService(messageFlux), bufferSize)); - }); + }) + .then() + .doOnSuccess(empty -> log.info("Restored {}", this)) + .doOnError(throwable -> log.error("Could not restore {}", this)) + .block(); + this.clock = clock; this.bufferSize = bufferSize; }