private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
- private final InfoChannel infoChannel;
-
- void run() throws InterruptedException
+ void run()
{
infoChannelConsumerTaskExecutor.executeConsumerTask();
-
- while (infoChannel.loadInProgress())
- {
- log.info("InfoChannel is still loading...");
- Thread.sleep(1000);
- }
-
dataChannelConsumerTaskExecutor.executeConsumerTask();
}
}
}
- void createChatRoom(ChatRoomInfo chatRoomInfo)
- {
- if (!isShardOwned[chatRoomInfo.getShard()])
- {
- log.debug("Ignoring not owned chat-room {}", chatRoomInfo);
- return;
- }
-
- UUID id = chatRoomInfo.getId();
- log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
- KafkaChatMessageService service = new KafkaChatMessageService(this, id);
- ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
- putChatRoom(
- chatRoomInfo.getId(),
- chatRoomInfo.getName(),
- chatRoomInfo.getShard(),
- chatRoomData);
- }
-
private void loadChatMessage(
UUID chatRoomId,
LocalDateTime timestamp,
Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
- ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
+ ChatRoomData chatRoomData = this.chatRoomData[partition].computeIfAbsent(
+ chatRoomId,
+ (id) ->
+ {
+ log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+ KafkaChatMessageService service = new KafkaChatMessageService(this, id);
+ return new ChatRoomData(clock, service, bufferSize);
+ });
KafkaChatMessageService kafkaChatRoomService =
(KafkaChatMessageService) chatRoomData.getChatRoomService();
}
- private void putChatRoom(
- UUID chatRoomId,
- String name,
- Integer partition,
- ChatRoomData chatRoomData)
- {
- if (this.chatRoomData[partition].containsKey(chatRoomId))
- {
- log.warn(
- "Ignoring existing chat-room for {}: {}",
- partition,
- chatRoomId);
- }
- else
- {
- log.info(
- "Adding new chat-room to partition {}: {}",
- partition,
- chatRoomData);
-
- this.chatRoomData[partition].put(chatRoomId, chatRoomData);
- }
- }
-
int[] getOwnedShards()
{
return IntStream
private final long[] startOffset;
private final long[] nextOffset;
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
- private final DataChannel dataChannel;
private boolean running;
public InfoChannel(
String topic,
Producer<String, AbstractMessageTo> producer,
- Consumer<String, AbstractMessageTo> infoChannelConsumer,
- DataChannel dataChannel)
+ Consumer<String, AbstractMessageTo> infoChannelConsumer)
{
log.debug(
"Creating InfoChannel for topic {}",
IntStream
.range(0, numShards)
.forEach(partition -> this.nextOffset[partition] = -1l);
-
- this.dataChannel = dataChannel;
}
chatRoomId);
this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
- this.dataChannel.createChatRoom(chatRoomInfo);
}
}
@Bean
ConsumerTaskRunner consumerTaskRunner(
ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
- ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
- InfoChannel infoChannel)
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
{
return new ConsumerTaskRunner(
infoChannelConsumerTaskExecutor,
- dataChannelConsumerTaskExecutor,
- infoChannel);
+ dataChannelConsumerTaskExecutor);
}
@Bean
InfoChannel infoChannel(
ChatBackendProperties properties,
Producer<String, AbstractMessageTo> producer,
- Consumer<String, AbstractMessageTo> infoChannelConsumer,
- DataChannel dataChannel)
+ Consumer<String, AbstractMessageTo> infoChannelConsumer)
{
return new InfoChannel(
properties.getKafka().getInfoChannelTopic(),
producer,
- infoChannelConsumer,
- dataChannel);
+ infoChannelConsumer);
}
@Bean