1 package de.juplo.kafka;
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.context.annotation.Configuration;
8 import org.springframework.kafka.core.ConsumerFactory;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.function.Consumer;
16 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
17 public class ApplicationConfiguration
20 public Consumer<ConsumerRecord<String, Long>> consumer()
29 public EndlessConsumer<String, Long> endlessConsumer(
30 org.apache.kafka.clients.consumer.Consumer<String, Long> kafkaConsumer,
31 ExecutorService executor,
32 Consumer<ConsumerRecord<String, Long>> handler,
33 KafkaProperties kafkaProperties,
34 ApplicationProperties applicationProperties)
37 new EndlessConsumer<>(
39 kafkaProperties.getConsumer().getClientId(),
40 applicationProperties.getTopic(),
46 public ExecutorService executor()
48 return Executors.newSingleThreadExecutor();
51 @Bean(destroyMethod = "close")
52 public org.apache.kafka.clients.consumer.Consumer<String, Long> kafkaConsumer(ConsumerFactory<String, Long> factory)
54 return factory.createConsumer();