private final long[] startOffset;
private final long[] currentOffset;
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+ private final DataChannel dataChannel;
private boolean running;
public InfoChannel(
String topic,
Producer<String, AbstractMessageTo> producer,
- Consumer<String, AbstractMessageTo> infoChannelConsumer)
+ Consumer<String, AbstractMessageTo> infoChannelConsumer,
+ DataChannel dataChannel)
{
log.debug(
"Creating InfoChannel for topic {}",
.entrySet()
.stream()
.forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue());
+
+ this.dataChannel = dataChannel;
}
- Mono<Boolean> loadInProgress()
+ boolean loadInProgress()
{
- return Mono
- .fromSupplier(() -> IntStream
- .range(0, numShards)
- .anyMatch(partition -> currentOffset[partition] < startOffset[partition]));
+ return IntStream
+ .range(0, numShards)
+ .anyMatch(partition -> currentOffset[partition] < startOffset[partition]);
}
Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
chatRoomId);
this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
+ this.dataChannel.createChatRoom(chatRoomInfo);
}
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
private final Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
private final WorkAssignor workAssignor;
- CompletableFuture<Void> chatRoomChannelConsumerJob;
+ CompletableFuture<Void> infoChannelConsumerJob;
+ CompletableFuture<Void> dataChannelConsumerJob;
@Override
public void run(ApplicationArguments args) throws Exception
{
+ String infoTopic = properties.getKafka().getInfoChannelTopic();
+ List<TopicPartition> partitions = infoChannelConsumer
+ .partitionsFor(infoTopic)
+ .stream()
+ .map(partitionInfo -> new TopicPartition(
+ infoTopic,
+ partitionInfo.partition()))
+ .toList();
+ infoChannelConsumer.assign(partitions);
+ log.info("Starting the consumer for the InfoChannel");
+ infoChannelConsumerJob = taskExecutor
+ .submitCompletable(infoChannel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer for the InfoChannel exited abnormally!", e);
+ return null;
+ });
+
+ while (infoChannel.loadInProgress())
+ {
+ log.info("InfoChannel is still loading...");
+ Thread.sleep(1000);
+ }
+
workAssignor.assignWork(chatRoomChannelConsumer);
- log.info("Starting the consumer for the ChatRoomChannel");
- chatRoomChannelConsumerJob = taskExecutor
+ log.info("Starting the consumer for the DataChannel");
+ dataChannelConsumerJob = taskExecutor
.submitCompletable(dataChannel)
.exceptionally(e ->
{
- log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+ log.error("The consumer for the DataChannel exited abnormally!", e);
return null;
});
}
public void joinChatRoomChannelConsumerJob()
{
log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
- chatRoomChannelConsumer.wakeup();
+ infoChannelConsumer.wakeup();
log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
- chatRoomChannelConsumerJob.join();
+ dataChannelConsumerJob.join();
log.info("Joined the consumer of the ChatRoomChannel");
}