private final long[] startOffset;
private final long[] currentOffset;
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+ private final DataChannel dataChannel;
private boolean running;
public InfoChannel(
String topic,
Producer<String, AbstractMessageTo> producer,
- Consumer<String, AbstractMessageTo> infoChannelConsumer)
+ Consumer<String, AbstractMessageTo> infoChannelConsumer,
+ DataChannel dataChannel)
{
log.debug(
"Creating InfoChannel for topic {}",
.entrySet()
.stream()
.forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue());
+
+ this.dataChannel = dataChannel;
}
- Mono<Boolean> loadInProgress()
+ boolean loadInProgress()
{
- return Mono
- .fromSupplier(() -> IntStream
- .range(0, numShards)
- .anyMatch(partition -> currentOffset[partition] < startOffset[partition]));
+ return IntStream
+ .range(0, numShards)
+ .anyMatch(partition -> currentOffset[partition] < startOffset[partition]);
}
Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
chatRoomId);
this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
+ this.dataChannel.createChatRoom(chatRoomInfo);
}
}
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
@Autowired
ConfigurableApplicationContext context;
+ @Autowired
+ InfoChannel infoChannel;
@Autowired
DataChannel dataChannel;
@Autowired
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+ Consumer<String, AbstractMessageTo> infoChannelConsumer;
+ @Autowired
+ Consumer<String, AbstractMessageTo> dataChannelConsumer;
- CompletableFuture<Void> chatRoomChannelConsumerJob;
+ CompletableFuture<Void> infoChannelConsumerJob;
+ CompletableFuture<Void> dataChannelConsumerJob;
@Override
public void run(ApplicationArguments args) throws Exception
{
+ String infoTopic = properties.getKafka().getInfoChannelTopic();
+ List< PartitionInfo> partitions =
+ infoChannelConsumer.partitionsFor(infoTopic);
+ infoChannelConsumer.assignment(partitions);
+ log.info("Starting the consumer for the InfoChannel");
+ infoChannelConsumerJob = taskExecutor
+ .submitCompletable(infoChannel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer for the InfoChannel exited abnormally!", e);
+ return null;
+ });
+
+ while ()
List<String> topics = List.of(properties.getKafka().getDataChannelTopic());
- chatRoomChannelConsumer.subscribe(topics, dataChannel);
+ dataChannelConsumer.subscribe(topics, dataChannel);
log.info("Starting the consumer for the ChatRoomChannel");
- chatRoomChannelConsumerJob = taskExecutor
+ dataChannelConsumerJob = taskExecutor
.submitCompletable(dataChannel)
.exceptionally(e ->
{
public void joinChatRoomChannelConsumerJob()
{
log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
- chatRoomChannelConsumer.wakeup();
+ infoChannelConsumer.wakeup();
log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
- chatRoomChannelConsumerJob.join();
+ dataChannelConsumerJob.join();
log.info("Joined the consumer of the ChatRoomChannel");
}
}