import java.time.*;
import java.util.*;
+import java.util.function.Function;
import java.util.stream.IntStream;
private final long[] currentOffset;
private final long[] nextOffset;
private final Map<UUID, ChatRoomData>[] chatRoomData;
+ private final InfoChannel infoChannel;
private boolean running;
@Getter
ZoneId zoneId,
int numShards,
int bufferSize,
- Clock clock)
+ Clock clock,
+ InfoChannel infoChannel)
{
log.debug(
"Creating DataChannel for topic {} with {} partitions",
this.chatRoomData = new Map[numShards];
IntStream
.range(0, numShards)
- .forEach(shard ->
- {
- this.chatRoomData[shard] = new HashMap<>();
- });
+ .forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
+ this.infoChannel = infoChannel;
}
Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
- ChatRoomData chatRoomData = this.chatRoomData[partition].computeIfAbsent(
- chatRoomId,
- (id) ->
- {
- log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
- KafkaChatMessageService service = new KafkaChatMessageService(this, id);
- return new ChatRoomData(clock, service, bufferSize);
- });
+ ChatRoomData chatRoomData = this
+ .chatRoomData[partition]
+ .computeIfAbsent(chatRoomId, this::computeChatRoomData);
KafkaChatMessageService kafkaChatRoomService =
(KafkaChatMessageService) chatRoomData.getChatRoomService();
return Mono.error(new ShardNotOwnedException(shard));
}
- return Mono.justOrEmpty(chatRoomData[shard].get(id));
+ return infoChannel
+ .getChatRoomInfo(id)
+ .map(chatRoomInfo ->
+ chatRoomData[shard].computeIfAbsent(id, this::computeChatRoomData));
+ }
+
+ private ChatRoomData computeChatRoomData(UUID chatRoomId)
+ {
+ log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
+ KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
+ return new ChatRoomData(clock, service, bufferSize);
}
}