-package de.juplo.kafka.chat.backend.domain;
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.time.Clock;
import java.util.*;
@Slf4j
public class SimpleChatHome implements ChatHome
{
- private final ChatHomeService service;
- private final int shard;
+ private final Integer shard;
+ private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+ private final Map<UUID, ChatRoomData> chatRoomData;
+ private final Clock clock;
+ private final int bufferSize;
- public SimpleChatHome(ChatHomeService service, int shard)
+
+ public SimpleChatHome(
+ StorageStrategy storageStrategy,
+ Clock clock,
+ int bufferSize)
+ {
+ this(
+ null,
+ storageStrategy,
+ clock,
+ bufferSize);
+ }
+
+ public SimpleChatHome(
+ Integer shard,
+ StorageStrategy storageStrategy,
+ Clock clock,
+ int bufferSize)
{
log.info("Created SimpleChatHome for shard {}", shard);
- this.service = service;
+;
this.shard = shard;
+ this.chatRoomInfo = new HashMap<>();
+ this.chatRoomData = new HashMap<>();
+ storageStrategy
+ .readChatRoomInfo()
+ .filter(info ->
+ {
+ if (shard == null || info.getShard() == shard)
+ {
+ return true;
+ }
+ else
+ {
+ log.info(
+ "SimpleChatHome for shard {} ignores not owned chat-room {}",
+ shard,
+ info);
+ return false;
+ }
+ })
+ .toStream()
+ .forEach(info ->
+ {
+ UUID chatRoomId = info.getId();
+ chatRoomInfo.put(chatRoomId, info);
+ Flux<Message> messageFlux =
+ storageStrategy.readChatRoomData(chatRoomId);
+ chatRoomData.put(
+ info.getId(),
+ new ChatRoomData(
+ clock,
+ new InMemoryChatRoomService(messageFlux),
+ bufferSize));
+ });
+ this.clock = clock;
+ this.bufferSize = bufferSize;
}
- public SimpleChatHome(ChatHomeService service)
+
+ @Override
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
- this(service, 0);
+ log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+ ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
+ ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
+ this.chatRoomInfo.put(id, chatRoomInfo);
+ ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ this.chatRoomData.put(id, chatRoomData);
+ return Mono.just(chatRoomInfo);
}
-
@Override
- public Mono<ChatRoom> getChatRoom(UUID id)
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
- return service
- .getChatRoom(shard, id)
+ return Mono
+ .justOrEmpty(chatRoomInfo.get(id))
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return Flux.fromIterable(chatRoomInfo.values());
+ }
+
+ @Override
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ return Mono
+ .justOrEmpty(chatRoomData.get(id))
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
{
- return service.getChatRooms(shard);
+ return Flux.fromIterable(chatRoomData.values());
}
}