package de.juplo.kafka.chat.backend.persistence.inmemory;
import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class SimpleChatHome implements ChatHome
{
private final Integer shard;
- private final Map<UUID, ChatRoom> chatRooms;
+ private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+ private final Map<UUID, ChatRoomData> chatRoomData;
private final Clock clock;
private final int bufferSize;
public SimpleChatHome(
- Flux<ChatRoom> chatroomFlux,
+ StorageStrategy storageStrategy,
Clock clock,
int bufferSize)
{
- this(null, chatroomFlux, clock, bufferSize);
+ this(
+ null,
+ storageStrategy,
+ clock,
+ bufferSize);
}
public SimpleChatHome(
Integer shard,
- Flux<ChatRoom> chatroomFlux,
+ StorageStrategy storageStrategy,
Clock clock,
int bufferSize)
{
log.info("Created SimpleChatHome for shard {}", shard);
;
this.shard = shard;
- this.chatRooms = new HashMap<>();
- chatroomFlux
- .filter(chatRoom ->
+ this.chatRoomInfo = new HashMap<>();
+ this.chatRoomData = new HashMap<>();
+ storageStrategy
+ .readChatRoomInfo()
+ .filter(info ->
{
- if (shard == null || chatRoom.getShard() == shard)
+ if (shard == null || info.getShard() == shard)
{
return true;
}
log.info(
"SimpleChatHome for shard {} ignores not owned chat-room {}",
shard,
- chatRoom);
+ info);
return false;
}
})
.toStream()
- .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom));
+ .forEach(info ->
+ {
+ UUID chatRoomId = info.getId();
+ chatRoomInfo.put(chatRoomId, info);
+ Flux<Message> messageFlux =
+ storageStrategy.readChatRoomData(chatRoomId);
+ chatRoomData.put(
+ info.getId(),
+ new ChatRoomData(
+ clock,
+ new InMemoryChatRoomService(messageFlux),
+ bufferSize));
+ });
this.clock = clock;
this.bufferSize = bufferSize;
}
{
log.info("Creating ChatRoom with buffer-size {}", bufferSize);
ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
- ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
- chatRooms.put(id, chatRoom);
- return Mono.just(chatRoom);
+ ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
+ this.chatRoomInfo.put(id, chatRoomInfo);
+ ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ this.chatRoomData.put(id, chatRoomData);
+ return Mono.just(chatRoomInfo);
}
@Override
- public Mono<ChatRoom> getChatRoom(UUID id)
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
return Mono
- .justOrEmpty(chatRooms.get(id))
+ .justOrEmpty(chatRoomInfo.get(id))
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return Flux.fromIterable(chatRoomInfo.values());
+ }
+
+ @Override
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ return Mono
+ .justOrEmpty(chatRoomData.get(id))
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
{
- return Flux.fromIterable(chatRooms.values());
+ return Flux.fromIterable(chatRoomData.values());
}
}