@Autowired
ChatRoomChannel chatRoomChannel;
@Autowired
- Consumer<String, AbstractMessageTo> chatMessageChannelConsumer;
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
- CompletableFuture<Void> chatMessageChannelConsumerJob;
+ CompletableFuture<Void> chatRoomChannelConsumerJob;
@Override
public void run(ApplicationArguments args) throws Exception
{
- log.info("Starting the consumer for the ChatMessageChannel");
- chatMessageChannelConsumerJob = taskExecutor
+ log.info("Starting the consumer for the ChatRoomChannel");
+ chatRoomChannelConsumerJob = taskExecutor
.submitCompletable(chatRoomChannel)
.exceptionally(e ->
{
- log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
+ log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
return null;
});
}
@PreDestroy
- public void joinChatMessageChannelConsumerJob()
+ public void joinChatRoomChannelConsumerJob()
{
log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
- chatMessageChannelConsumer.wakeup();
- log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
- chatMessageChannelConsumerJob.join();
- log.info("Joined the consumer of the ChatMessageChannel");
+ chatRoomChannelConsumer.wakeup();
+ log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+ chatRoomChannelConsumerJob.join();
+ log.info("Joined the consumer of the ChatRoomChannel");
}
}
}
@Bean
- ChatRoomChannel chatMessageChannel(
+ ChatRoomChannel chatRoomChannel(
ChatBackendProperties properties,
- Producer<String, AbstractMessageTo> chatMessageChannelProducer,
- Consumer<String, AbstractMessageTo> chatMessageChannelConsumer,
+ Producer<String, AbstractMessageTo> chatRoomChannelProducer,
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
ZoneId zoneId,
Clock clock)
{
return new ChatRoomChannel(
properties.getKafka().getMessageChannelTopic(),
- chatMessageChannelProducer,
- chatMessageChannelConsumer,
+ chatRoomChannelProducer,
+ chatRoomChannelConsumer,
zoneId,
properties.getKafka().getNumPartitions(),
properties.getChatroomBufferSize(),
}
@Bean
- Producer<String, AbstractMessageTo> chatMessageChannelProducer(
+ Producer<String, AbstractMessageTo> chatRoomChannelProducer(
Properties defaultProducerProperties,
ChatBackendProperties chatBackendProperties,
StringSerializer stringSerializer,
}
@Bean
- Consumer<String, AbstractMessageTo> chatMessageChannelConsumer(
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer(
Properties defaultConsumerProperties,
ChatBackendProperties chatBackendProperties,
StringDeserializer stringDeserializer,