1 package de.juplo.kafka;
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.apache.kafka.common.TopicPartition;
5 import org.apache.kafka.common.serialization.ByteArraySerializer;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
8 import org.springframework.boot.context.properties.EnableConfigurationProperties;
9 import org.springframework.context.annotation.Bean;
10 import org.springframework.context.annotation.Configuration;
11 import org.springframework.kafka.core.*;
12 import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
13 import org.springframework.kafka.listener.DefaultErrorHandler;
14 import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
15 import org.springframework.kafka.support.serializer.JsonSerializer;
16 import org.springframework.util.backoff.FixedBackOff;
19 import java.util.function.Consumer;
23 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
24 public class ApplicationConfiguration
27 public Consumer<ConsumerRecord<String, ClientMessage>> consumer()
36 public ProducerFactory<String, Object> producerFactory(KafkaProperties properties) {
37 return new DefaultKafkaProducerFactory<>(
38 properties.getProducer().buildProperties(),
39 new StringSerializer(),
40 new DelegatingByTypeSerializer(Map.of(
41 byte[].class, new ByteArraySerializer(),
42 ClientMessage.class, new JsonSerializer<>())));
46 public KafkaTemplate<String, Object> kafkaTemplate(
47 ProducerFactory<String, Object> producerFactory) {
49 return new KafkaTemplate<>(producerFactory);
53 public DeadLetterPublishingRecoverer recoverer(
54 ApplicationProperties properties,
55 KafkaOperations<?, ?> template)
57 return new DeadLetterPublishingRecoverer(
59 (record, exception) -> new TopicPartition(properties.getDlqTopic(), record.partition()));
63 public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)
65 return new DefaultErrorHandler(recoverer, new FixedBackOff(0l, 0l));
68 @Bean(destroyMethod = "close")
69 public org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer(ConsumerFactory<String, ClientMessage> factory)
71 return factory.createConsumer();