+ return new KafkaProducer<>(
+ defaultProducerProperties,
+ stringSerializer,
+ messageSerializer);
+ }
+
+ @Bean
+ StringSerializer stringSerializer()
+ {
+ return new StringSerializer();
+ }
+
+ @Bean
+ JsonSerializer<MessageTo> chatMessageSerializer()
+ {
+ JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+ return serializer;
+ }
+
+ @Bean
+ Consumer<String, MessageTo> chatMessageChannelConsumer(
+ Map<String, String> defaultConsumerProperties,
+ StringDeserializer stringDeserializer,
+ JsonDeserializer<MessageTo> messageDeserializer)
+ {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "chat_message_channel");
+ return new KafkaConsumer<>(
+ properties,
+ stringDeserializer,
+ messageDeserializer);
+ }
+
+ @Bean
+ StringDeserializer stringDeserializer()
+ {
+ return new StringDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<MessageTo> chatMessageDeserializer()
+ {
+ JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+ return deserializer;
+ }
+
+ @Bean
+ Map<String, String> defaultProducerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ return Map.of(
+ ProducerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientId(),
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ }
+
+ @Bean
+ Map<String, String> defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ return Map.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers(),
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientId(),
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ "false",
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ "earliest");