+ @Bean
+ Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
+ Properties producerProperties,
+ IntegerSerializer integerSerializer,
+ JsonSerializer<ChatRoomTo> chatRoomSerializer)
+ {
+ return new KafkaProducer<>(
+ producerProperties,
+ integerSerializer,
+ chatRoomSerializer);
+ }
+
+ @Bean
+ IntegerSerializer integerSerializer()
+ {
+ return new IntegerSerializer();
+ }
+
+ @Bean
+ JsonSerializer<ChatRoomTo> chatRoomSerializer()
+ {
+ JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+ return serializer;
+ }
+
+ @Bean
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
+ Properties producerProperties,
+ IntegerDeserializer integerDeserializer,
+ JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+ {
+ return new KafkaConsumer<>(
+ producerProperties,
+ integerDeserializer,
+ chatRoomDeserializer);
+ }
+
+ @Bean
+ IntegerDeserializer integerDeserializer()
+ {
+ return new IntegerDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+ {
+ JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+ return deserializer;
+ }
+
+ @Bean
+ ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+ {
+ return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+ }
+