private final Map<UUID, ChatRoomInfo> chatRoomInfo;
private final Map<UUID, ChatRoomData> chatRoomData;
private final Clock clock;
- private final int bufferSize;
+ private final int historyLimit;
public SimpleChatHomeService(
- StorageStrategy storageStrategy,
Clock clock,
- int bufferSize)
+ int historyLimit)
{
this(
null,
- storageStrategy,
clock,
- bufferSize);
+ historyLimit);
}
public SimpleChatHomeService(
Integer shard,
- StorageStrategy storageStrategy,
Clock clock,
- int bufferSize)
+ int historyLimit)
{
- log.info("Created SimpleChatHome for shard {}", shard);
-;
+ log.debug("Creating SimpleChatHomeService");
+
this.shard = shard;
this.chatRoomInfo = new HashMap<>();
this.chatRoomData = new HashMap<>();
- storageStrategy
+ this.clock = clock;
+ this.historyLimit = historyLimit;
+ }
+
+
+ Mono<Void> restore(StorageStrategy storageStrategy)
+ {
+ chatRoomInfo.clear();
+ chatRoomData.clear();
+
+ return storageStrategy
.readChatRoomInfo()
.filter(info ->
{
return false;
}
})
- .toStream()
- .forEach(info ->
+ .flatMap(info ->
{
UUID chatRoomId = info.getId();
+ InMemoryChatMessageService chatMessageService =
+ new InMemoryChatMessageService(chatRoomId);
+
chatRoomInfo.put(chatRoomId, info);
- Flux<Message> messageFlux =
- storageStrategy.readChatRoomData(chatRoomId);
- chatRoomData.put(
- info.getId(),
+ ChatRoomData chatRoomData =
new ChatRoomData(
clock,
- new InMemoryChatMessageService(messageFlux),
- bufferSize));
- });
- this.clock = clock;
- this.bufferSize = bufferSize;
+ chatMessageService,
+ historyLimit);
+ chatRoomData.activate();
+ this.chatRoomData.put(info.getId(), chatRoomData);
+
+ return chatMessageService.restore(storageStrategy);
+ })
+ .count()
+ .doOnSuccess(count -> log.info("Restored {} with {} chat-rooms", this, count))
+ .doOnError(throwable -> log.error("Could not restore {}", this))
+ .then();
}
@Override
public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
- log.info("Creating ChatRoom with buffer-size {}", bufferSize);
- ChatMessageService service = new InMemoryChatMessageService(Flux.empty());
+ log.info("Creating ChatRoom with history-limit {}", historyLimit);
+ ChatMessageService service = new InMemoryChatMessageService(id);
ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
this.chatRoomInfo.put(id, chatRoomInfo);
- ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
+ chatRoomData.activate();
this.chatRoomData.put(id, chatRoomData);
return Mono.just(chatRoomInfo);
}
.justOrEmpty(chatRoomData.get(id))
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
+
+ @Override
+ public Mono<String[]> getShardOwners()
+ {
+ return Mono.empty();
+ }
+
+ @Override
+ public String toString()
+ {
+ return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;
+ }
}