DLT-Konfig für `spring-consumer`, die auch mit Poison Pills umgehen kann
[demos/kafka/training] / src / main / java / de / juplo / kafka / Application.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.producer.ProducerConfig;
4 import org.apache.kafka.common.serialization.ByteArraySerializer;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.springframework.boot.SpringApplication;
7 import org.springframework.boot.autoconfigure.SpringBootApplication;
8 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
9 import org.springframework.boot.context.properties.EnableConfigurationProperties;
10 import org.springframework.context.annotation.Bean;
11 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
12 import org.springframework.kafka.core.KafkaOperations;
13 import org.springframework.kafka.core.KafkaTemplate;
14 import org.springframework.kafka.core.ProducerFactory;
15 import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
16 import org.springframework.kafka.listener.DefaultErrorHandler;
17 import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
18 import org.springframework.kafka.support.serializer.JsonSerializer;
19 import org.springframework.util.backoff.FixedBackOff;
20
21 import java.util.Map;
22
23
24 @SpringBootApplication
25 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
26 public class Application
27 {
28   @Bean
29   public ProducerFactory<String, Object> producerFactory(
30     KafkaProperties properties)
31   {
32     Map<String, Object> map = Map.of(
33       ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
34     return new DefaultKafkaProducerFactory<>(
35       map,
36       new StringSerializer(),
37       new DelegatingByTypeSerializer(
38         Map.of(
39           byte[].class, new ByteArraySerializer(),
40           MessageAddNumber.class, new JsonSerializer<>(),
41           MessageCalculateSum.class, new JsonSerializer<>())));
42   }
43
44   @Bean
45   public KafkaTemplate<String, Object> kafkaTemplate(
46     ProducerFactory<String, Object> producerFactory)
47   {
48     return new KafkaTemplate<>(producerFactory);
49   }
50
51   @Bean
52   public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
53     KafkaOperations<?, ?> kafkaTemplate)
54   {
55     return new DeadLetterPublishingRecoverer(kafkaTemplate);
56   }
57
58   @Bean
59   public DefaultErrorHandler errorHandler(
60     DeadLetterPublishingRecoverer recoverer)
61   {
62     return new DefaultErrorHandler(
63       recoverer,
64       new FixedBackOff(0l, 0l));
65   }
66
67
68   public static void main(String[] args)
69   {
70     SpringApplication.run(Application.class, args);
71   }
72 }