private final long[] currentOffset;
private final long[] nextOffset;
private final Map<UUID, ChatRoomData>[] chatRoomData;
+ private final InfoChannel infoChannel;
private boolean running;
@Getter
ZoneId zoneId,
int numShards,
int bufferSize,
- Clock clock)
+ Clock clock,
+ InfoChannel infoChannel)
{
log.debug(
"Creating DataChannel for topic {} with {} partitions",
this.chatRoomData = new Map[numShards];
IntStream
.range(0, numShards)
- .forEach(shard ->
- {
- this.chatRoomData[shard] = new HashMap<>();
- });
+ .forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
+ this.infoChannel = infoChannel;
}
return Mono.error(new ShardNotOwnedException(shard));
}
- return Mono.justOrEmpty(chatRoomData[shard].get(id));
+ return infoChannel
+ .getChatRoomInfo(id)
+ .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent(
+ id,
+ (chatRoomId) ->
+ {
+ log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
+ KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
+ return new ChatRoomData(clock, service, bufferSize);
+ }));
}
}
if (metadata != null)
{
log.info("Successfully sent chreate-request for chat room: {}", to);
- ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
+ ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
sink.success(chatRoomInfo);
}
else
Producer<String, AbstractMessageTo> producer,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
ZoneId zoneId,
- Clock clock)
+ Clock clock,
+ InfoChannel infoChannel)
{
return new DataChannel(
properties.getKafka().getDataChannelTopic(),
zoneId,
properties.getKafka().getNumPartitions(),
properties.getChatroomBufferSize(),
- clock);
+ clock,
+ infoChannel);
}
@Bean