1 package de.juplo.kafka;
3 import org.apache.kafka.clients.producer.ProducerConfig;
4 import org.apache.kafka.common.serialization.ByteArraySerializer;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.springframework.boot.SpringApplication;
7 import org.springframework.boot.autoconfigure.SpringBootApplication;
8 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
9 import org.springframework.boot.context.properties.EnableConfigurationProperties;
10 import org.springframework.context.annotation.Bean;
11 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
12 import org.springframework.kafka.core.KafkaOperations;
13 import org.springframework.kafka.core.KafkaTemplate;
14 import org.springframework.kafka.core.ProducerFactory;
15 import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
16 import org.springframework.kafka.listener.DefaultErrorHandler;
17 import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
18 import org.springframework.kafka.support.serializer.JsonSerializer;
19 import org.springframework.util.backoff.FixedBackOff;
24 @SpringBootApplication
25 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
26 public class Application
29 public ProducerFactory<String, Object> producerFactory(
30 KafkaProperties properties)
32 Map<String, Object> map = Map.of(
33 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
34 return new DefaultKafkaProducerFactory<>(
36 new StringSerializer(),
37 new DelegatingByTypeSerializer(
39 byte[].class, new ByteArraySerializer(),
40 MessageAddNumber.class, new JsonSerializer<>(),
41 MessageCalculateSum.class, new JsonSerializer<>())));
45 public KafkaTemplate<String, Object> kafkaTemplate(
46 ProducerFactory<String, Object> producerFactory)
48 return new KafkaTemplate<>(producerFactory);
52 public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
53 KafkaOperations<?, ?> kafkaTemplate)
55 return new DeadLetterPublishingRecoverer(kafkaTemplate);
59 public DefaultErrorHandler errorHandler(
60 DeadLetterPublishingRecoverer recoverer)
62 return new DefaultErrorHandler(
64 new FixedBackOff(0l, 0l));
68 public static void main(String[] args)
70 SpringApplication.run(Application.class, args);