+ @Bean
+ ProducerFactory<?, ?> producerFactory(Properties streamProcessorProperties)
+ {
+ Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+ propertyMap.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ JsonSerializer.class.getName());
+ propertyMap.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ JsonSerializer.class.getName());
+
+ return new DefaultKafkaProducerFactory<>(propertyMap);
+ }
+
+ @Bean
+ ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
+ Properties streamProcessorProperties)
+ {
+ Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+ propertyMap.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ JsonDeserializer.class.getName());
+ propertyMap.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ JsonDeserializer.class.getName());
+
+ ConsumerFactory<? super Object, ? super Object> consumerFactory =
+ new DefaultKafkaConsumerFactory<>(propertyMap);
+
+ ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+
+ kafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
+
+ return kafkaListenerContainerFactory;
+ }
+