import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
+import org.springframework.kafka.listener.CommonErrorHandler;
import java.util.function.Consumer;
};
}
+ @Bean
+ public KafkaListenerContainerFactory<?> batchFactory(
+ ConsumerFactory<String, Long> consumerFactory,
+ CommonErrorHandler errorHandler)
+ {
+ ConcurrentKafkaListenerContainerFactory<String, Long> factory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+
+ factory.setConsumerFactory(consumerFactory);
+ factory.setCommonErrorHandler(errorHandler);
+ factory.setBatchListener(true);
+
+ return factory;
+ }
+
@Bean
public CommonContainerStoppingErrorHandler errorHandler()
{