{
log.debug("Creating InMemoryChatMessageService");
messages = new LinkedHashMap<>();
- messageFlux.subscribe(message -> messages.put(message.getKey(), message));
+ messageFlux
+ .doOnNext(message -> messages.put(message.getKey(), message))
+ .then()
+ .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService"))
+ .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"))
+ .block();
}
@Override
Clock clock,
int bufferSize)
{
- this.shard = shard;
- log.info("Created {}", this);
+ log.debug("Creating SimpleChatHomeService");
+ this.shard = shard;
this.chatRoomInfo = new HashMap<>();
this.chatRoomData = new HashMap<>();
+
storageStrategy
.readChatRoomInfo()
.filter(info ->
return false;
}
})
- .toStream()
- .forEach(info ->
+ .doOnNext(info ->
{
UUID chatRoomId = info.getId();
chatRoomInfo.put(chatRoomId, info);
clock,
new InMemoryChatMessageService(messageFlux),
bufferSize));
- });
+ })
+ .then()
+ .doOnSuccess(empty -> log.info("Restored {}", this))
+ .doOnError(throwable -> log.error("Could not restore {}", this))
+ .block();
+
this.clock = clock;
this.bufferSize = bufferSize;
}