1 package de.juplo.kafka;
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.apache.kafka.common.TopicPartition;
5 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
6 import org.springframework.boot.context.properties.EnableConfigurationProperties;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.context.annotation.Configuration;
9 import org.springframework.kafka.core.ConsumerFactory;
10 import org.springframework.kafka.core.KafkaOperations;
11 import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
12 import org.springframework.kafka.listener.DefaultErrorHandler;
13 import org.springframework.util.backoff.FixedBackOff;
15 import java.util.function.Consumer;
19 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
20 public class ApplicationConfiguration
23 public Consumer<ConsumerRecord<String, ClientMessage>> consumer()
32 public DeadLetterPublishingRecoverer recoverer(
33 ApplicationProperties properties,
34 KafkaOperations<?, ?> template)
36 return new DeadLetterPublishingRecoverer(
38 (record, exception) -> new TopicPartition(properties.getDlqTopic(), record.partition()));
42 public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)
44 return new DefaultErrorHandler(recoverer, new FixedBackOff(0l, 0l));
47 @Bean(destroyMethod = "close")
48 public org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer(ConsumerFactory<String, ClientMessage> factory)
50 return factory.createConsumer();