private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
- private final Map<UUID, ChatRoom>[] chatrooms;
+ private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
+ private final Map<UUID, ChatRoomData>[] chatRoomData;
private boolean running;
@Getter
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
- this.chatrooms = new Map[numShards];
+ this.chatRoomInfo = new Map[numShards];
+ this.chatRoomData = new Map[numShards];
IntStream
.range(0, numShards)
- .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
+ .forEach(shard ->
+ {
+ this.chatRoomInfo[shard] = new HashMap<>();
+ this.chatRoomData[shard] = new HashMap<>();
+ });
}
private void createChatRoom(
UUID chatRoomId,
CommandCreateChatRoomTo createChatRoomRequestTo,
- int partition)
+ Integer partition)
{
- log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
- KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
- ChatRoom chatRoom = new ChatRoom(
+ log.info(
+ "Loading ChatRoom {} for shard {} with buffer-size {}",
chatRoomId,
- createChatRoomRequestTo.getName(),
partition,
+ bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
+ ChatRoomData chatRoomData = new ChatRoomData(
clock,
service,
bufferSize);
- putChatRoom(chatRoom);
+ putChatRoom(
+ chatRoomId,
+ createChatRoomRequestTo.getName(),
+ partition,
+ chatRoomData);
}
private void createChatRoom(ChatRoomInfo chatRoomInfo)
{
UUID id = chatRoomInfo.getId();
- String name = chatRoomInfo.getName();
- int shard = chatRoomInfo.getShard();
log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
KafkaChatRoomService service = new KafkaChatRoomService(this, id);
- ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
- putChatRoom(chatRoom);
+ ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ putChatRoom(
+ chatRoomInfo.getId(),
+ chatRoomInfo.getName(),
+ chatRoomInfo.getShard(),
+ chatRoomData);
}
private void loadChatMessage(
Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
- ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
+ ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
KafkaChatRoomService kafkaChatRoomService =
- (KafkaChatRoomService) chatRoom.getChatRoomService();
+ (KafkaChatRoomService) chatRoomData.getChatRoomService();
kafkaChatRoomService.persistMessage(message);
}
}
- private void putChatRoom(ChatRoom chatRoom)
+ private void putChatRoom(
+ UUID chatRoomId,
+ String name,
+ Integer partition,
+ ChatRoomData chatRoomData)
{
- Integer partition = chatRoom.getShard();
- UUID chatRoomId = chatRoom.getId();
- if (chatrooms[partition].containsKey(chatRoomId))
+ if (this.chatRoomInfo[partition].containsKey(chatRoomId))
{
- log.warn("Ignoring existing chat-room: " + chatRoom);
+ log.warn(
+ "Ignoring existing chat-room for {}: {}",
+ partition,
+ chatRoomId);
}
else
{
log.info(
"Adding new chat-room to partition {}: {}",
partition,
- chatRoom);
+ chatRoomData);
- chatrooms[partition].put(chatRoomId, chatRoom);
+ this.chatRoomInfo[partition].put(
+ chatRoomId,
+ new ChatRoomInfo(chatRoomId, name, partition));
+ this.chatRoomData[partition].put(chatRoomId, chatRoomData);
}
}
.toArray();
}
- Mono<ChatRoom> getChatRoom(int shard, UUID id)
+ Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
+ {
+ if (loadInProgress)
+ {
+ return Mono.error(new LoadInProgressException());
+ }
+
+ if (!isShardOwned[shard])
+ {
+ return Mono.error(new ShardNotOwnedException(shard));
+ }
+
+ return Mono.justOrEmpty(chatRoomData[shard].get(id));
+ }
+
+ Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return Flux
+ .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
+ .filter(shard -> isShardOwned[shard])
+ .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
+ }
+
+ Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
{
if (loadInProgress)
{
- throw new LoadInProgressException();
+ return Mono.error(new LoadInProgressException());
}
if (!isShardOwned[shard])
{
- throw new ShardNotOwnedException(shard);
+ return Mono.error(new ShardNotOwnedException(shard));
}
- return Mono.justOrEmpty(chatrooms[shard].get(id));
+ return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
}
- Flux<ChatRoom> getChatRooms()
+ Flux<ChatRoomData> getChatRoomData()
{
return Flux
.fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
.filter(shard -> isShardOwned[shard])
- .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
+ .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
}
}