-package de.juplo.kafka.chat.backend.persistence.inmemory;
+package de.juplo.kafka.chat.backend.implementation.inmemory;
import de.juplo.kafka.chat.backend.domain.*;
import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
private final Map<UUID, ChatRoomData> chatRoomData;
private final Clock clock;
- private final int bufferSize;
+ private final int historyLimit;
public SimpleChatHomeService(
- StorageStrategy storageStrategy,
Clock clock,
- int bufferSize)
+ int historyLimit)
{
this(
null,
- storageStrategy,
clock,
- bufferSize);
+ historyLimit);
}
public SimpleChatHomeService(
Integer shard,
- StorageStrategy storageStrategy,
Clock clock,
- int bufferSize)
+ int historyLimit)
{
- log.info("Created SimpleChatHome for shard {}", shard);
-;
+ log.debug("Creating SimpleChatHomeService");
+
this.shard = shard;
this.chatRoomInfo = new HashMap<>();
this.chatRoomData = new HashMap<>();
- storageStrategy
+ this.clock = clock;
+ this.historyLimit = historyLimit;
+ }
+
+
+ Mono<Void> restore(StorageStrategy storageStrategy)
+ {
+ chatRoomInfo.clear();
+ chatRoomData.clear();
+
+ return storageStrategy
.readChatRoomInfo()
.filter(info ->
{
return false;
}
})
- .toStream()
- .forEach(info ->
+ .flatMap(info ->
{
UUID chatRoomId = info.getId();
+ InMemoryChatMessageService chatMessageService =
+ new InMemoryChatMessageService(chatRoomId);
+
chatRoomInfo.put(chatRoomId, info);
- Flux<Message> messageFlux =
- storageStrategy.readChatRoomData(chatRoomId);
chatRoomData.put(
info.getId(),
new ChatRoomData(
clock,
- new InMemoryChatRoomService(messageFlux),
- bufferSize));
- });
- this.clock = clock;
- this.bufferSize = bufferSize;
+ chatMessageService,
+ historyLimit));
+
+ return chatMessageService.restore(storageStrategy);
+ })
+ .count()
+ .doOnSuccess(count -> log.info("Restored {} with {} chat-rooms", this, count))
+ .doOnError(throwable -> log.error("Could not restore {}", this))
+ .then();
}
@Override
public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
- log.info("Creating ChatRoom with buffer-size {}", bufferSize);
- ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
+ log.info("Creating ChatRoom with history-limit {}", historyLimit);
+ ChatMessageService service = new InMemoryChatMessageService(id);
ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
this.chatRoomInfo.put(id, chatRoomInfo);
- ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
this.chatRoomData.put(id, chatRoomData);
return Mono.just(chatRoomInfo);
}
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
- public Flux<ChatRoomData> getChatRoomData()
+ @Override
+ public Mono<String[]> getShardOwners()
+ {
+ return Mono.empty();
+ }
+
+ @Override
+ public String toString()
{
- return Flux.fromIterable(chatRoomData.values());
+ return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;
}
}