package de.juplo.kafka;
-import org.springframework.beans.factory.annotation.Autowired;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.backoff.FixedBackOff;
-import java.util.concurrent.Executors;
+import java.util.Map;
+import java.util.Optional;
@SpringBootApplication
-@EnableConfigurationProperties(ApplicationProperties.class)
+@Slf4j
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
+@EnableKafka
public class Application
{
- @Autowired
- ApplicationProperties properties;
+ @Bean
+ public ApplicationRecordHandler applicationRecordHandler(
+ AdderResults adderResults,
+ KafkaProperties kafkaProperties,
+ ApplicationProperties applicationProperties)
+ {
+ return new ApplicationRecordHandler(
+ adderResults,
+ Optional.ofNullable(applicationProperties.getThrottle()),
+ kafkaProperties.getConsumer().getGroupId());
+ }
+ @Bean
+ public AdderResults adderResults()
+ {
+ return new AdderResults();
+ }
@Bean
- public EndlessConsumer consumer()
+ public ApplicationRebalanceListener rebalanceListener(
+ ApplicationRecordHandler recordHandler,
+ AdderResults adderResults,
+ StateRepository stateRepository,
+ KafkaProperties kafkaProperties)
{
- EndlessConsumer consumer =
- new EndlessConsumer(
- Executors.newFixedThreadPool(1),
- properties.getBootstrapServer(),
- properties.getGroupId(),
- properties.getClientId(),
- properties.getTopic(),
- properties.getAutoOffsetReset());
+ return new ApplicationRebalanceListener(
+ recordHandler,
+ adderResults,
+ stateRepository,
+ kafkaProperties.getConsumer().getGroupId());
+ }
- consumer.start();
+ @Bean
+ ApplicationHealthIndicator applicationHealthIndicator(
+ KafkaListenerEndpointRegistry registry,
+ KafkaProperties properties)
+ {
+ return new ApplicationHealthIndicator(
+ properties.getConsumer().getGroupId(),
+ registry);
+ }
- return consumer;
+ @Bean
+ public ProducerFactory<String, Object> producerFactory(
+ KafkaProperties properties)
+ {
+ return new DefaultKafkaProducerFactory<>(
+ properties.getProducer().buildProperties(),
+ new StringSerializer(),
+ new DelegatingByTypeSerializer(
+ Map.of(
+ byte[].class, new ByteArraySerializer(),
+ MessageAddNumber.class, new JsonSerializer<>(),
+ MessageCalculateSum.class, new JsonSerializer<>())));
}
+ @Bean
+ public KafkaTemplate<String, Object> kafkaTemplate(
+ ProducerFactory<String, Object> producerFactory)
+ {
+ return new KafkaTemplate<>(producerFactory);
+ }
+
+ @Bean
+ public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
+ KafkaOperations<?, ?> kafkaTemplate)
+ {
+ return new DeadLetterPublishingRecoverer(kafkaTemplate);
+ }
+
+ @Bean
+ public DefaultErrorHandler errorHandler(
+ DeadLetterPublishingRecoverer recoverer)
+ {
+ return new DefaultErrorHandler(
+ recoverer,
+ new FixedBackOff(0l, 0l));
+ }
+
+
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);