+ @Bean
+ KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
+ {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ return new KafkaConsumer<>(props);
+ }
+
+ @Bean(destroyMethod = "shutdown")
+ ExecutorService executorService()
+ {
+ return Executors.newFixedThreadPool(1);
+ }
+
+ @Bean(destroyMethod = "shutdown")
+ TransferConsumer transferConsumer(
+ TransferServiceProperties properties,
+ KafkaConsumer<String, String> consumer,
+ ExecutorService executorService,
+ ObjectMapper mapper,
+ TransferService transferService)
+ {
+ TransferConsumer transferConsumer =
+ new TransferConsumer(properties.topic, consumer, executorService, mapper, transferService);
+ transferConsumer.start();
+ return transferConsumer;
+ }
+