* This innocent little change discloses a severe missconception in the
implementation of the storage strategies.
* The call to `Mono.block()`, though not really changing the behaviour
during the restore-process, triggers a sanity-check from
io.projectractor.
{
log.debug("Creating InMemoryChatMessageService");
messages = new LinkedHashMap<>();
{
log.debug("Creating InMemoryChatMessageService");
messages = new LinkedHashMap<>();
- messageFlux.subscribe(message -> messages.put(message.getKey(), message));
+ messageFlux
+ .doOnNext(message -> messages.put(message.getKey(), message))
+ .then()
+ .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService"))
+ .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"))
+ .block();
Clock clock,
int bufferSize)
{
Clock clock,
int bufferSize)
{
- this.shard = shard;
- log.info("Created {}", this);
+ log.debug("Creating SimpleChatHomeService");
this.chatRoomInfo = new HashMap<>();
this.chatRoomData = new HashMap<>();
this.chatRoomInfo = new HashMap<>();
this.chatRoomData = new HashMap<>();
storageStrategy
.readChatRoomInfo()
.filter(info ->
storageStrategy
.readChatRoomInfo()
.filter(info ->
- .toStream()
- .forEach(info ->
{
UUID chatRoomId = info.getId();
chatRoomInfo.put(chatRoomId, info);
{
UUID chatRoomId = info.getId();
chatRoomInfo.put(chatRoomId, info);
clock,
new InMemoryChatMessageService(messageFlux),
bufferSize));
clock,
new InMemoryChatMessageService(messageFlux),
bufferSize));
+ })
+ .then()
+ .doOnSuccess(empty -> log.info("Restored {}", this))
+ .doOnError(throwable -> log.error("Could not restore {}", this))
+ .block();
+
this.clock = clock;
this.bufferSize = bufferSize;
}
this.clock = clock;
this.bufferSize = bufferSize;
}