- @Bean
- ProducerFactory<?, ?> producerFactory(Properties streamProcessorProperties)
- {
- Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
-
- propertyMap.put(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- JsonSerializer.class.getName());
- propertyMap.put(
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- JsonSerializer.class.getName());
-
- return new DefaultKafkaProducerFactory<>(propertyMap);
- }
-
- @Bean
- ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
- Properties streamProcessorProperties)
- {
- Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
-
- propertyMap.put(
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- JsonDeserializer.class.getName());
- propertyMap.put(
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- JsonDeserializer.class.getName());
-
- ConsumerFactory<? super Object, ? super Object> consumerFactory =
- new DefaultKafkaConsumerFactory<>(propertyMap);
-
- ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory =
- new ConcurrentKafkaListenerContainerFactory<>();
-
- kafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
-
- return kafkaListenerContainerFactory;
- }
-