1 package de.juplo.kafka;
3 import org.apache.kafka.common.serialization.ByteArraySerializer;
4 import org.apache.kafka.common.serialization.StringSerializer;
5 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
6 import org.springframework.boot.context.properties.EnableConfigurationProperties;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.context.annotation.Configuration;
11 import java.util.Optional;
13 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
14 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
15 import org.springframework.kafka.core.KafkaOperations;
16 import org.springframework.kafka.core.KafkaTemplate;
17 import org.springframework.kafka.core.ProducerFactory;
18 import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
19 import org.springframework.kafka.listener.DefaultErrorHandler;
20 import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.util.backoff.FixedBackOff;
26 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
27 public class ApplicationConfiguration
30 public ApplicationRecordHandler applicationRecordHandler(
31 AdderResults adderResults,
32 KafkaProperties kafkaProperties,
33 ApplicationProperties applicationProperties)
35 return new ApplicationRecordHandler(
37 Optional.ofNullable(applicationProperties.getThrottle()),
38 kafkaProperties.getClientId());
42 public AdderResults adderResults()
44 return new AdderResults();
48 public ApplicationRebalanceListener rebalanceListener(
49 ApplicationRecordHandler recordHandler,
50 AdderResults adderResults,
51 StateRepository stateRepository,
52 KafkaProperties kafkaProperties)
54 return new ApplicationRebalanceListener(
58 kafkaProperties.getClientId());
62 public EndlessConsumer endlessConsumer(
63 RecordHandler recordHandler,
64 KafkaProperties kafkaProperties,
65 KafkaListenerEndpointRegistry endpointRegistry)
69 kafkaProperties.getClientId(),
75 public ProducerFactory<String, Object> producerFactory(
76 KafkaProperties properties)
78 return new DefaultKafkaProducerFactory<>(
79 properties.getProducer().buildProperties(),
80 new StringSerializer(),
81 new DelegatingByTypeSerializer(
83 byte[].class, new ByteArraySerializer(),
84 MessageAddNumber.class, new JsonSerializer<>(),
85 MessageCalculateSum.class, new JsonSerializer<>())));
89 public KafkaTemplate<String, Object> kafkaTemplate(
90 ProducerFactory<String, Object> producerFactory)
92 return new KafkaTemplate<>(producerFactory);
96 public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
97 KafkaOperations<?, ?> kafkaTemplate)
99 return new DeadLetterPublishingRecoverer(kafkaTemplate);
103 public DefaultErrorHandler errorHandler(
104 DeadLetterPublishingRecoverer recoverer)
106 return new DefaultErrorHandler(
108 new FixedBackOff(0l, 0l));