private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
- private final Map<UUID, ChatRoomData>[] chatrooms;
+ private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
+ private final Map<UUID, ChatRoomData>[] chatRoomData;
private boolean running;
@Getter
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
- this.chatrooms = new Map[numShards];
+ this.chatRoomInfo = new Map[numShards];
+ this.chatRoomData = new Map[numShards];
IntStream
.range(0, numShards)
- .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
+ .forEach(shard ->
+ {
+ this.chatRoomInfo[shard] = new HashMap<>();
+ this.chatRoomData[shard] = new HashMap<>();
+ });
}
private void createChatRoom(
UUID chatRoomId,
CommandCreateChatRoomTo createChatRoomRequestTo,
- int partition)
+ Integer partition)
{
- log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
+ log.info(
+ "Loading ChatRoom {} for shard {} with buffer-size {}",
+ chatRoomId,
+ partition,
+ bufferSize);
KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
ChatRoomData chatRoomData = new ChatRoomData(
clock,
service,
bufferSize);
- putChatRoom(chatRoomData);
+ putChatRoom(
+ chatRoomId,
+ createChatRoomRequestTo.getName(),
+ partition,
+ chatRoomData);
}
private void createChatRoom(ChatRoomInfo chatRoomInfo)
{
UUID id = chatRoomInfo.getId();
- String name = chatRoomInfo.getName();
- int shard = chatRoomInfo.getShard();
log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
KafkaChatRoomService service = new KafkaChatRoomService(this, id);
ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
- putChatRoom(chatRoomData);
+ putChatRoom(
+ chatRoomInfo.getId(),
+ chatRoomInfo.getName(),
+ chatRoomInfo.getShard(),
+ chatRoomData);
}
private void loadChatMessage(
Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
- ChatRoomData chatRoomData = chatrooms[partition].get(chatRoomId);
+ ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
KafkaChatRoomService kafkaChatRoomService =
(KafkaChatRoomService) chatRoomData.getChatRoomService();
}
- private void putChatRoom(ChatRoomData chatRoomData)
+ private void putChatRoom(
+ UUID chatRoomId,
+ String name,
+ Integer partition,
+ ChatRoomData chatRoomData)
{
- Integer partition = chatRoomData.getShard();
- UUID chatRoomId = chatRoomData.getId();
- if (chatrooms[partition].containsKey(chatRoomId))
+ if (this.chatRoomInfo[partition].containsKey(chatRoomId))
{
- log.warn("Ignoring existing chat-room: " + chatRoomData);
+ log.warn(
+ "Ignoring existing chat-room for {}: {}",
+ partition,
+ chatRoomId);
}
else
{
partition,
chatRoomData);
- chatrooms[partition].put(chatRoomId, chatRoomData);
+ this.chatRoomInfo[partition].put(
+ chatRoomId,
+ new ChatRoomInfo(chatRoomId, name, partition));
+ this.chatRoomData[partition].put(chatRoomId, chatRoomData);
}
}
return Mono.error(new ShardNotOwnedException(shard));
}
- return Mono.justOrEmpty(chatrooms[shard].get(id));
+ return Mono.justOrEmpty(chatRoomData[shard].get(id));
+ }
+
+ Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return Flux
+ .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
+ .filter(shard -> isShardOwned[shard])
+ .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
+ }
+
+ Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
+ {
+ if (loadInProgress)
+ {
+ return Mono.error(new LoadInProgressException());
+ }
+
+ if (!isShardOwned[shard])
+ {
+ return Mono.error(new ShardNotOwnedException(shard));
+ }
+
+ return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
}
Flux<ChatRoomData> getChatRoomData()
return Flux
.fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
.filter(shard -> isShardOwned[shard])
- .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
+ .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
}
}