{
@Bean
ChatHome kafkaChatHome(
- ShardingStrategy shardingStrategy,
+ KafkaLikeShardingStrategy shardingStrategy,
ChatMessageChannel chatMessageChannel)
{
return new KafkaChatHome(shardingStrategy, chatMessageChannel);
}
@Bean
- KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
+ KafkaChatRoomFactory chatRoomFactory(ChatMessageChannel chatMessageChannel)
{
- return new KafkaChatRoomFactory(chatRoomChannel);
- }
-
- @Bean
- ChatRoomChannel chatRoomChannel(
- ChatBackendProperties properties,
- Producer<Integer, CreateChatRoomRequestTo> chatRoomChannelProducer,
- Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer,
- ShardingStrategy shardingStrategy,
- ChatMessageChannel chatMessageChannel,
- Clock clock)
- {
- return new ChatRoomChannel(
- properties.getKafka().getChatroomChannelTopic(),
- chatRoomChannelProducer,
- chatRoomChannelConsumer,
- shardingStrategy,
- chatMessageChannel,
- clock,
- properties.getChatroomBufferSize());
+ return new KafkaChatRoomFactory(chatMessageChannel);
}
@Bean
}
@Bean
- IntegerSerializer integerSerializer()
- {
- return new IntegerSerializer();
- }
-
- @Bean
- JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer()
- {
- JsonSerializer<CreateChatRoomRequestTo> serializer = new JsonSerializer<>();
- serializer.configure(
- Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
- false);
- return serializer;
- }
-
- @Bean
- Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer(
- Properties defaultConsumerProperties,
- ChatBackendProperties chatBackendProperties,
- IntegerDeserializer integerDeserializer,
- JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer)
- {
- Map<String, Object> properties = new HashMap<>();
- defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
- properties.put(
- ConsumerConfig.CLIENT_ID_CONFIG,
- chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
- properties.put(
- ConsumerConfig.GROUP_ID_CONFIG,
- "chat_room_channel");
- return new KafkaConsumer<>(
- properties,
- integerDeserializer,
- chatRoomDeserializer);
- }
-
- @Bean
- IntegerDeserializer integerDeserializer()
- {
- return new IntegerDeserializer();
- }
-
- @Bean
- JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer()
- {
- JsonDeserializer<CreateChatRoomRequestTo> deserializer = new JsonDeserializer<>();
- deserializer.configure(
- Map.of(
- JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
- JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class,
- JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
- false );
- return deserializer;
- }
-
- @Bean
- ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+ KafkaLikeShardingStrategy shardingStrategy(ChatBackendProperties properties)
{
return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
}
ChatBackendProperties properties,
Producer<String, AbstractTo> chatMessageChannelProducer,
Consumer<String, AbstractTo> chatMessageChannelConsumer,
- ZoneId zoneId)
+ ZoneId zoneId,
+ Clock clock)
{
return new ChatMessageChannel(
properties.getKafka().getMessageChannelTopic(),
chatMessageChannelProducer,
chatMessageChannelConsumer,
zoneId,
- properties.getKafka().getNumPartitions());
+ properties.getKafka().getNumPartitions(),
+ properties.getChatroomBufferSize(),
+ clock);
}
@Bean