Springify: GRÜN - Unerwartetes Verhalten lag an Konfigurations-Fehler
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationConfiguration.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.springframework.boot.context.properties.EnableConfigurationProperties;
5 import org.springframework.context.annotation.Bean;
6 import org.springframework.context.annotation.Configuration;
7 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
8 import org.springframework.kafka.config.KafkaListenerContainerFactory;
9 import org.springframework.kafka.core.ConsumerFactory;
10 import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
11 import org.springframework.kafka.listener.CommonErrorHandler;
12
13 import java.util.function.Consumer;
14
15
16 @Configuration
17 @EnableConfigurationProperties(ApplicationProperties.class)
18 public class ApplicationConfiguration
19 {
20   @Bean
21   public Consumer<ConsumerRecord<String, Long>> consumer()
22   {
23     return (record) ->
24     {
25       // Handle record
26     };
27   }
28
29   @Bean
30   public KafkaListenerContainerFactory<?> batchFactory(
31       ConsumerFactory<String, Long> consumerFactory,
32       CommonErrorHandler errorHandler)
33   {
34     ConcurrentKafkaListenerContainerFactory<String, Long> factory =
35         new ConcurrentKafkaListenerContainerFactory<>();
36
37     factory.setConsumerFactory(consumerFactory);
38     factory.setCommonErrorHandler(errorHandler);
39     factory.setBatchListener(true);
40
41     return factory;
42   }
43
44   @Bean
45   public CommonContainerStoppingErrorHandler errorHandler()
46   {
47     return new CommonContainerStoppingErrorHandler();
48   }
49 }