Springify: Gemeinsame DLQ für Poison Pills und Fachlogik-Fehler konfiguriert
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationConfiguration.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.apache.kafka.common.TopicPartition;
5 import org.apache.kafka.common.serialization.ByteArraySerializer;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
8 import org.springframework.boot.context.properties.EnableConfigurationProperties;
9 import org.springframework.context.annotation.Bean;
10 import org.springframework.context.annotation.Configuration;
11 import org.springframework.kafka.core.*;
12 import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
13 import org.springframework.kafka.listener.DefaultErrorHandler;
14 import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
15 import org.springframework.kafka.support.serializer.JsonSerializer;
16 import org.springframework.util.backoff.FixedBackOff;
17
18 import java.util.Map;
19 import java.util.function.Consumer;
20
21
22 @Configuration
23 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
24 public class ApplicationConfiguration
25 {
26   @Bean
27   public Consumer<ConsumerRecord<String, ClientMessage>> consumer()
28   {
29     return (record) ->
30     {
31       // Handle record
32     };
33   }
34
35   @Bean
36   public ProducerFactory<String, Object> producerFactory(KafkaProperties properties) {
37     return new DefaultKafkaProducerFactory<>(
38         properties.getProducer().buildProperties(),
39         new StringSerializer(),
40         new DelegatingByTypeSerializer(Map.of(
41             byte[].class, new ByteArraySerializer(),
42             ClientMessage.class, new JsonSerializer<>())));
43   }
44
45   @Bean
46   public KafkaTemplate<String, Object> kafkaTemplate(
47       ProducerFactory<String, Object> producerFactory) {
48
49     return new KafkaTemplate<>(producerFactory);
50   }
51
52   @Bean
53   public DeadLetterPublishingRecoverer recoverer(
54       ApplicationProperties properties,
55       KafkaOperations<?, ?> template)
56   {
57     return new DeadLetterPublishingRecoverer(
58         template,
59         (record, exception) -> new TopicPartition(properties.getDlqTopic(), record.partition()));
60   }
61
62   @Bean
63   public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)
64   {
65     return new DefaultErrorHandler(recoverer, new FixedBackOff(0l, 0l));
66   }
67
68   @Bean(destroyMethod = "close")
69   public org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer(ConsumerFactory<String, ClientMessage> factory)
70   {
71     return factory.createConsumer();
72   }
73 }